Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=936389&r1=936388&r2=936389&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java (original) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java Wed Apr 21 16:39:23 2010 @@ -27,11 +27,18 @@ import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import javax.sql.DataSource; +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.iapi.Scheduler.JobDetails; +import org.apache.ode.utils.DbIsolation; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,7 +59,7 @@ public class JdbcDelegate implements Dat private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?"; - private static final String UPDATE_JOB = "update ODE_JOB set ts = ?, details = ? where jobid = ?"; + private static final String UPDATE_JOB = "update ODE_JOB set ts = ?, retryCount = ? where jobid = ?"; private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null " + "and mod(ts,?) = ? and ts < ?"; @@ -70,12 +77,60 @@ public class JdbcDelegate implements Dat + "and -1 <> ? and -1 <> ? and ts < ?"; private static final String SAVE_JOB = "insert into ODE_JOB " - + " (jobid, nodeid, ts, scheduled, transacted, details) values(?, ?, ?, ?, ?, ?)"; + + " (jobid, nodeid, ts, scheduled, transacted, " + + "instanceId," + + "mexId," + + "processId," + + "type," + + "channel," + + "correlatorId," + + "correlationKeySet," + + "retryCount," + + "inMem," + + "detailsExt" + + ") values(?, ?, ?, ?, ?," + + "?," + + "?," + + "?," + + "?," + + "?," + + "?," + + "?," + + "?," + + "?," + + "?" + + ")"; private static final String GET_NODEIDS = "select distinct nodeid from ODE_JOB"; - private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, details from ODE_JOB " - + "where nodeid = ? and ts < ? order by ts"; + private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, " + + "instanceId," + + "mexId," + + "processId," + + "type," + + "channel," + + "correlatorId," + + "correlationKeySet," + + "retryCount," + + "inMem," + + "detailsExt" + + " from ODE_JOB " + + "where nodeid = ? and scheduled = 0 and ts < ? order by ts"; + +// public Long instanceId; +// public String mexId; +// public String processId; +// public String type; +// public String channel; +// public String correlatorId; +// public String correlationKeySet; +// public Integer retryCount; +// public Boolean inMem; +// public Map<String, Object> detailsExt = new HashMap<String, Object>(); + + private static final String UPDATE_SCHEDULED = "update ODE_JOB set scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + private static final int UPDATE_SCHEDULED_SLOTS = 10; private DataSource _ds; @@ -137,21 +192,39 @@ public class JdbcDelegate implements Dat Connection con = null; PreparedStatement ps = null; try { + int i = 1; con = getConnection(); ps = con.prepareStatement(SAVE_JOB); - ps.setString(1, job.jobId); - ps.setString(2, nodeId); - ps.setLong(3, job.schedDate); - ps.setInt(4, asInteger(loaded)); - ps.setInt(5, asInteger(job.transacted)); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - StreamUtils.write(bos, (Serializable) job.detail); - } catch (Exception ex) { - __log.error("Error serializing job detail: " + job.detail); - throw new DatabaseException(ex); + ps.setString(i++, job.jobId); + ps.setString(i++, nodeId); + ps.setLong(i++, job.schedDate); + ps.setInt(i++, asInteger(loaded)); + ps.setInt(i++, asInteger(job.transacted)); + + JobDetails details = job.detail; + ps.setObject(i++, details.instanceId, Types.BIGINT); + ps.setObject(i++, details.mexId, Types.VARCHAR); + ps.setObject(i++, details.processId, Types.VARCHAR); + ps.setObject(i++, details.type, Types.VARCHAR); + ps.setObject(i++, details.channel, Types.VARCHAR); + ps.setObject(i++, details.correlatorId, Types.VARCHAR); + ps.setObject(i++, details.correlationKeySet, Types.VARCHAR); + ps.setObject(i++, details.retryCount, Types.INTEGER); + ps.setObject(i++, details.inMem, Types.INTEGER); + + if (details.detailsExt == null || details.detailsExt.size() == 0) { + ps.setObject(i++, null, Types.BLOB); + } else { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + StreamUtils.write(bos, (Serializable) details.detailsExt); + } catch (Exception ex) { + __log.error("Error serializing job detail: " + job.detail); + throw new DatabaseException(ex); + } + ps.setBytes(i++, bos.toByteArray()); } - ps.setBytes(6, bos.toByteArray()); + return ps.executeUpdate() == 1; } catch (SQLException se) { throw new DatabaseException(se); @@ -163,7 +236,7 @@ public class JdbcDelegate implements Dat public boolean updateJob(Job job) throws DatabaseException { if (__log.isDebugEnabled()) - __log.debug("updateJob " + job.jobId + " details=" + job); + __log.debug("updateJob " + job.jobId + " retryCount=" + job.detail.getRetryCount()); Connection con = null; PreparedStatement ps = null; @@ -171,14 +244,7 @@ public class JdbcDelegate implements Dat con = getConnection(); ps = con.prepareStatement(UPDATE_JOB); ps.setLong(1, job.schedDate); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - StreamUtils.write(bos, (Serializable) job.detail); - } catch (Exception ex) { - __log.error("Error serializing job detail: " + job.detail); - throw new DatabaseException(ex); - } - ps.setBytes(2, bos.toByteArray()); + ps.setInt(2, job.detail.getRetryCount()); ps.setString(3, job.jobId); return ps.executeUpdate() == 1; } catch (SQLException se) { @@ -200,16 +266,64 @@ public class JdbcDelegate implements Dat ps.setString(1, nodeId); ps.setLong(2, maxtime); ps.setMaxRows(maxjobs); + ResultSet rs = ps.executeQuery(); while (rs.next()) { - Map<String, Object> details; - try { - ObjectInputStream is = new ObjectInputStream(rs.getBinaryStream(5)); - details = (Map<String, Object>) is.readObject(); - is.close(); - } catch (Exception e) { - throw new DatabaseException("Error deserializing job details", e); + Scheduler.JobDetails details = new Scheduler.JobDetails(); + details.instanceId = (Long) rs.getObject("instanceId"); + details.mexId = (String) rs.getObject("mexId"); + details.processId = (String) rs.getObject("processId"); + details.type = (String) rs.getObject("type"); + details.channel = (String) rs.getObject("channel"); + details.correlatorId = (String) rs.getObject("correlatorId"); + details.correlationKeySet = (String) rs.getObject("correlationKeySet"); + details.retryCount = (Integer) rs.getObject("retryCount"); + details.inMem = (Boolean) rs.getObject("inMem"); + if (rs.getObject("detailsExt") != null) { + try { + ObjectInputStream is = new ObjectInputStream(rs.getBinaryStream("detailsExt")); + details.detailsExt = (Map<String, Object>) is.readObject(); + is.close(); + } catch (Exception e) { + throw new DatabaseException("Error deserializing job detailsExt", e); + } + } + + { + //For compatibility reasons, we check whether there are entries inside + //jobDetailsExt blob, which correspond to extracted entries. If so, we + //use them. + + Map<String, Object> detailsExt = details.getDetailsExt(); + if (detailsExt.get("type") != null) { + details.type = (String) detailsExt.get("type"); + } + if (detailsExt.get("iid") != null) { + details.instanceId = (Long) detailsExt.get("iid"); + } + if (detailsExt.get("pid") != null) { + details.processId = (String) detailsExt.get("pid"); + } + if (detailsExt.get("inmem") != null) { + details.inMem = (Boolean) detailsExt.get("inmem"); + } + if (detailsExt.get("ckey") != null) { + details.correlationKeySet = (String) detailsExt.get("ckey"); + } + if (detailsExt.get("channel") != null) { + details.channel = (String) detailsExt.get("channel"); + } + if (detailsExt.get("mexid") != null) { + details.mexId = (String) detailsExt.get("mexid"); + } + if (detailsExt.get("correlatorId") != null) { + details.correlatorId = (String) detailsExt.get("correlatorId"); + } + if (detailsExt.get("retryCount") != null) { + details.retryCount = Integer.parseInt((String) detailsExt.get("retryCount")); + } } + Job job = new Job(rs.getLong(2), rs.getString(1), asBoolean(rs.getInt(3)), details); ret.add(job); }
Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java?rev=936389&r1=936388&r2=936389&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java (original) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java Wed Apr 21 16:39:23 2010 @@ -22,6 +22,7 @@ package org.apache.ode.scheduler.simple; import java.util.Map; import java.text.SimpleDateFormat; +import org.apache.ode.bpel.iapi.Scheduler.JobDetails; import org.apache.ode.utils.GUID; /** @@ -34,14 +35,14 @@ class Job extends Task { String jobId; boolean transacted; - Map<String,Object> detail; + JobDetails detail; boolean persisted = true; - public Job(long when, boolean transacted, Map<String, Object> jobDetail) { + public Job(long when, boolean transacted, JobDetails jobDetail) { this(when, new GUID().toString(),transacted,jobDetail); } - - public Job(long when, String jobId, boolean transacted,Map<String, Object> jobDetail) { + + public Job(long when, String jobId, boolean transacted, JobDetails jobDetail) { super(when); this.jobId = jobId; this.detail = jobDetail; Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=936389&r1=936388&r2=936389&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Apr 21 16:39:23 2010 @@ -33,12 +33,15 @@ import javax.transaction.Synchronization import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; +import javax.xml.namespace.QName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.common.CorrelationKey; import org.apache.ode.bpel.iapi.ContextException; import org.apache.ode.bpel.iapi.Scheduler; import org.apache.log4j.helpers.AbsoluteTimeDateFormat; +import org.apache.ode.bpel.iapi.Scheduler.JobType; /** * A reliable and relatively simple scheduler that uses a database to persist information about @@ -340,7 +343,7 @@ public class SimpleScheduler implements } } - public String schedulePersistedJob(final Map<String, Object> jobDetail, Date when) throws ContextException { + public String schedulePersistedJob(final JobDetails jobDetail, Date when) throws ContextException { long ctime = System.currentTimeMillis(); if (when == null) when = new Date(ctime); @@ -356,10 +359,10 @@ public class SimpleScheduler implements if (when == null) when = new Date(ctime); - Map<String, Object> jobDetails = new HashMap<String, Object>(); - jobDetails.put("runnable", runnable); - runnable.storeToDetailsMap(jobDetails); - + JobDetails jobDetails = new JobDetails(); + jobDetails.getDetailsExt().put("runnable", runnable); + runnable.storeToDetails(jobDetails); + if (__log.isDebugEnabled()) __log.debug("scheduling " + jobDetails + " for " + when); @@ -396,11 +399,11 @@ public class SimpleScheduler implements return job.jobId; } - public String scheduleVolatileJob(boolean transacted, Map<String, Object> jobDetail) throws ContextException { + public String scheduleVolatileJob(boolean transacted, JobDetails jobDetail) throws ContextException { return scheduleVolatileJob(transacted, jobDetail, null); } - public String scheduleVolatileJob(boolean transacted, Map<String, Object> jobDetail, Date when) throws ContextException { + public String scheduleVolatileJob(boolean transacted, JobDetails jobDetail, Date when) throws ContextException { long ctime = System.currentTimeMillis(); if (when == null) when = new Date(ctime); @@ -502,7 +505,7 @@ public class SimpleScheduler implements public Void call() throws Exception { try { final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, - (Integer) (job.detail.get("retry") != null ? job.detail.get("retry") : 0)); + job.detail.getRetryCount()); if (job.transacted) { final boolean[] needRetry = new boolean[]{true}; try { @@ -514,7 +517,7 @@ public class SimpleScheduler implements try { processor.onScheduledJob(jobInfo); // If the job is a "runnable" job, schedule the next job occurence - if (job.detail.get("runnable") != null && !"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status")))) { + if (job.detail.getDetailsExt().get("runnable") != null && !"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.getDetailsExt().get("runnable_status")))) { // the runnable is still in progress, schedule checker to 10 mins later if (_pollIntervalForPolledRunnable < 0) { if (__log.isWarnEnabled()) @@ -547,9 +550,9 @@ public class SimpleScheduler implements execTransaction(new Callable<Void>() { public Void call() throws Exception { if (needRetry[0]) { - int retry = job.detail.get("retry") != null ? (((Integer) job.detail.get("retry")) + 1) : 0; + int retry = job.detail.getRetryCount() + 1; if (retry <= 10) { - job.detail.put("retry", retry); + job.detail.setRetryCount(retry); long delay = (long)(Math.pow(5, retry)); job.schedDate = System.currentTimeMillis() + delay*1000; _db.updateJob(job); @@ -643,7 +646,7 @@ public class SimpleScheduler implements public void runTask(final Task task) { if (task instanceof Job) { Job job = (Job)task; - if( job.detail.get("runnable") != null ) { + if( job.detail.getDetailsExt().get("runnable") != null ) { runPolledRunnable(job); } else { runJob(job); @@ -811,9 +814,17 @@ public class SimpleScheduler implements } finally { __log.debug("node recovery complete"); } - } +// private long doRetry(Job job) throws DatabaseException { +// int retry = job.detail.getRetryCount() + 1; +// job.detail.setRetryCount(retry); +// long delay = (long)(Math.pow(5, retry - 1)); +// Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, true, job.detail); +// _db.insertJob(jobRetry, _nodeId, false); +// return delay; +// } + private abstract class SchedulerTask extends Task implements Runnable { SchedulerTask(long schedDate) { super(schedDate); Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java?rev=936389&r1=936388&r2=936389&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java (original) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java Wed Apr 21 16:39:23 2010 @@ -19,6 +19,7 @@ package org.apache.ode.scheduler.simple; +import java.io.InputStream; import java.sql.Connection; import javax.sql.DataSource; @@ -64,9 +65,18 @@ public class DelegateSupport { public void setup() throws Exception { Connection c = _ds.getConnection(); try { + StringBuffer sql = new StringBuffer(); + + { + InputStream in = getClass().getResourceAsStream("/simplesched-hsql.sql"); + int v; + while ((v = in.read()) != -1) { + sql.append((char) v); + } + } + c.createStatement().executeUpdate("CREATE ALIAS MOD FOR \"org.apache.ode.scheduler.simple.DelegateSupport.mod\";"); - String sql = "CREATE TABLE \"ODE_JOB\" (\"JOBID\" CHAR(64) NOT NULL, \"TS\" NUMERIC NOT NULL, \"NODEID\" char(64) NULL, \"SCHEDULED\" int NOT NULL, \"TRANSACTED\" int NOT NULL, \"DETAILS\" BINARY(4096) NULL, PRIMARY KEY(\"JOBID\"));"; - c.createStatement().executeUpdate(sql); + c.createStatement().executeUpdate(sql.toString()); } finally { c.close(); } Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?rev=936389&r1=936388&r2=936389&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java (original) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java Wed Apr 21 16:39:23 2010 @@ -22,6 +22,10 @@ package org.apache.ode.scheduler.simple; import java.util.HashMap; import java.util.List; +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.iapi.Scheduler.JobType; import org.apache.ode.scheduler.simple.DatabaseDelegate; import org.apache.ode.scheduler.simple.Job; @@ -52,19 +56,19 @@ public class JdbcDelegateTest extends Te assertEquals(0, nids.size()); // try for one nodeid - _del.insertJob(new Job(0L,true,new HashMap<String, Object>()), "abc", true); + _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "abc", true); nids = _del.getNodeIds(); assertEquals(1, nids.size()); assertTrue(nids.contains("abc")); // check that dups are ignored. - _del.insertJob(new Job(0L,true,new HashMap<String, Object>()), "abc", true); + _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "abc", true); nids = _del.getNodeIds(); assertEquals(1, nids.size()); assertTrue(nids.contains("abc")); // add another nodeid, - _del.insertJob(new Job(0L,true,new HashMap<String, Object>()), "123", true); + _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "123", true); nids = _del.getNodeIds(); assertEquals(2, nids.size()); assertTrue(nids.contains("abc")); @@ -72,8 +76,8 @@ public class JdbcDelegateTest extends Te } public void testReassign() throws Exception { - _del.insertJob(new Job(100L,"j1",true,new HashMap<String, Object>()), "n1", false); - _del.insertJob(new Job(200L,"j2",true,new HashMap<String, Object>()), "n2", false); + _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false); + _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false); assertEquals(1,_del.updateReassign("n1","n2")); List<Job> jobs = _del.dequeueImmediate("n2", 400L, 1000); @@ -81,8 +85,8 @@ public class JdbcDelegateTest extends Te } public void testScheduleImmediateTimeFilter() throws Exception { - _del.insertJob(new Job(100L,"j1",true,new HashMap<String, Object>()), "n1", false); - _del.insertJob(new Job(200L,"j2",true,new HashMap<String, Object>()), "n1", false); + _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false); + _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n1", false); List<Job> jobs = _del.dequeueImmediate("n1", 150L, 1000); @@ -97,8 +101,8 @@ public class JdbcDelegateTest extends Te } public void testScheduleImmediateMaxRows() throws Exception { - _del.insertJob(new Job(100L,"j1",true,new HashMap<String, Object>()), "n1", false); - _del.insertJob(new Job(200L,"j2",true,new HashMap<String, Object>()), "n1", false); + _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false); + _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n1", false); List<Job> jobs = _del.dequeueImmediate("n1", 201L, 1); assertNotNull(jobs); @@ -107,8 +111,8 @@ public class JdbcDelegateTest extends Te } public void testScheduleImmediateNodeFilter() throws Exception { - _del.insertJob(new Job(100L,"j1",true,new HashMap<String, Object>()), "n1", false); - _del.insertJob(new Job(200L,"j2",true,new HashMap<String, Object>()), "n2", false); + _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false); + _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false); List<Job> jobs = _del.dequeueImmediate("n2", 300L, 1000); assertNotNull(jobs); @@ -117,8 +121,8 @@ public class JdbcDelegateTest extends Te } public void testDeleteJob() throws Exception { - _del.insertJob(new Job(100L,"j1",true,new HashMap<String, Object>()), "n1", false); - _del.insertJob(new Job(200L,"j2",true,new HashMap<String, Object>()), "n2", false); + _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false); + _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false); // try deleting, wrong jobid -- del should fail assertFalse(_del.deleteJob("j1x", "n1")); @@ -135,7 +139,7 @@ public class JdbcDelegateTest extends Te public void testUpgrade() throws Exception { for (int i = 0; i < 200; ++i) - _del.insertJob(new Job(i ,"j" +i,true,new HashMap<String, Object>()), null, false); + _del.insertJob(new Job(i ,"j" +i,true,new Scheduler.JobDetails()), null, false); int n1 = _del.updateAssignToNode("n1", 0, 3, 100); int n2 = _del.updateAssignToNode("n2", 1, 3, 100); @@ -149,4 +153,30 @@ public class JdbcDelegateTest extends Te assertEquals(n3,_del.dequeueImmediate("n3", 10000L, 1000).size()); } + public void testMigration() throws Exception { + Scheduler.JobDetails j1 = new Scheduler.JobDetails(); + j1.getDetailsExt().put("type", "MATCHER"); + j1.getDetailsExt().put("iid", 1234L); + j1.getDetailsExt().put("pid", new QName("http://test1", "test2").toString()); + j1.getDetailsExt().put("inmem", true); + j1.getDetailsExt().put("ckey", "@2[some~001~002]"); + j1.getDetailsExt().put("channel", "123"); + j1.getDetailsExt().put("mexid", "mexid123"); + j1.getDetailsExt().put("correlatorId", "cid123"); + j1.getDetailsExt().put("retryCount", "15"); + + _del.insertJob(new Job(0 ,"migration",true,j1), null, false); + _del.updateAssignToNode("m", 0, 3, 100); + Scheduler.JobDetails j2 = _del.dequeueImmediate("m", 10000L, 1000).get(0).detail; + + assertEquals(j2.getType(), JobType.MATCHER); + assertEquals(j2.getInstanceId(), (Object) 1234L); + assertEquals(j2.getProcessId(), new QName("http://test1", "test2")); + assertEquals(j2.getInMem(), (Object) true); + assertEquals(j2.getCorrelationKeySet().toCanonicalString(), (Object) "@2[some~001~002]"); + assertEquals(j2.getChannel(), (Object) "123"); + assertEquals(j2.getMexId(), (Object) "mexid123"); + assertEquals(j2.getCorrelatorId(), (Object) "cid123"); + assertEquals(j2.getRetryCount(), (Object) 15); + } } Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=936389&r1=936388&r2=936389&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java (original) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java Wed Apr 21 16:39:23 2010 @@ -20,6 +20,8 @@ package org.apache.ode.scheduler.simple; import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import javax.transaction.TransactionManager; @@ -32,6 +34,8 @@ import junit.framework.TestCase; * @author Matthieu Riou <[email protected]> */ public class RetriesTest extends TestCase implements Scheduler.JobProcessor { + private static final Log __log = LogFactory.getLog(RetriesTest.class); + DelegateSupport _ds; SimpleScheduler _scheduler; ArrayList<Scheduler.JobInfo> _jobs; @@ -66,7 +70,7 @@ public class RetriesTest extends TestCas } Thread.sleep(10000); - assertEquals(8, _tried); + assertEquals(4, _tried); } public void testExecTransaction() throws Exception { @@ -95,10 +99,10 @@ public class RetriesTest extends TestCas throw new Scheduler.JobProcessorException(jobInfo.retryCount < 1); } - Map<String, Object> newDetail(String x) { - HashMap<String, Object> det = new HashMap<String, Object>(); - det.put("foo", x); - return det; + Scheduler.JobDetails newDetail(String x) { + Scheduler.JobDetails jd = new Scheduler.JobDetails(); + jd.getDetailsExt().put("foo", x); + return jd; } private SimpleScheduler newScheduler(String nodeId) { Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=936389&r1=936388&r2=936389&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Wed Apr 21 16:39:23 2010 @@ -29,6 +29,8 @@ import javax.transaction.TransactionMana import junit.framework.TestCase; +import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; +import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.bpel.iapi.Scheduler.JobInfo; import org.apache.ode.bpel.iapi.Scheduler.JobProcessor; import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException; @@ -238,10 +240,10 @@ public class SimpleSchedulerTest extends } - Map<String, Object> newDetail(String x) { - HashMap<String, Object> det = new HashMap<String, Object>(); - det.put("foo", x); - return det; + Scheduler.JobDetails newDetail(String x) { + Scheduler.JobDetails jd = new Scheduler.JobDetails(); + jd.getDetailsExt().put("foo", x); + return jd; } private SimpleScheduler newScheduler(String nodeId) { Added: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/log4j.properties?rev=936389&view=auto ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/log4j.properties (added) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/log4j.properties Wed Apr 21 16:39:23 2010 @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to WARN and its only appender to CONSOLE +log4j.rootLogger=WARN, CONSOLE + +# log4j properties to work with commandline tools. +log4j.category.org.apache.ode.scheduler.simple.RetriesTest=DEBUG +log4j.category.org.apache.ode.bpel.engine=INFO + +# Console appender +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%p - %C{1}.%M(%L) | %m%n Propchange: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/log4j.properties ------------------------------------------------------------------------------ svn:eol-style = native Added: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/simplesched-hsql.sql URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/simplesched-hsql.sql?rev=936389&view=auto ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/simplesched-hsql.sql (added) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/simplesched-hsql.sql Wed Apr 21 16:39:23 2010 @@ -0,0 +1,20 @@ +CREATE TABLE ode_job ( + jobid CHAR(64) NOT NULL, + ts BIGINT NOT NULL, + nodeid char(64), + scheduled int NOT NULL, + transacted int NOT NULL, + + instanceId BIGINT, + mexId varchar(255), + processId varchar(255), + type varchar(255), + channel varchar(255), + correlatorId varchar(255), + correlationKeySet varchar(255), + retryCount int, + inMem int, + detailsExt binary(4096), + + PRIMARY KEY(jobid)); + Propchange: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/resources/simplesched-hsql.sql ------------------------------------------------------------------------------ svn:eol-style = native
