Author: mriou
Date: Wed Sep  3 10:32:51 2008
New Revision: 691693

URL: http://svn.apache.org/viewvc?rev=691693&view=rev
Log:
Canceling invocation check on reply.

Modified:
    
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
    
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
    
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
    
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: 
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- 
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java
 (original)
+++ 
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java
 Wed Sep  3 10:32:51 2008
@@ -265,7 +265,6 @@
             __log.error(emsg, e);
 
         }
-
     }
 
     private void reply(final PartnerRoleMessageExchange odeMex, final 
Operation operation, final MessageContext reply, final boolean isFault) {

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
 (original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
 Wed Sep  3 10:32:51 2008
@@ -46,20 +46,7 @@
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.evar.ExternalVariableModule;
 import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.BpelEventListener;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
@@ -520,7 +507,6 @@
                 // doing any work on its behalf, therefore we will reschedule 
the
                 // events for some time in the future (1 minute).
                 _contexts.execTransaction(new Callable<Void>() {
-
                     public Void call() throws Exception {
                         _contexts.scheduler.jobCompleted(jobInfo.jobName);
                         Date future = new Date(System.currentTimeMillis() + 
(60 * 1000));
@@ -532,6 +518,19 @@
                 });
                 return;
             }
+            
+            if (we.getType().equals(WorkEvent.Type.INVOKE_CHECK)) {
+                if (__log.isDebugEnabled()) __log.debug("handleWorkEvent: 
InvokeCheck event for mexid " + we.getMexId());
+
+                PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange) 
getMessageExchange(we.getMexId());
+                if (mex.getStatus() == MessageExchange.Status.ASYNC || 
mex.getStatus() == MessageExchange.Status.ACK) {
+                    String msg = "Dangling invocation (mexId=" + we.getMexId() 
+ "), forcing it into a failed state.";
+                    if (__log.isDebugEnabled()) __log.debug(msg);
+                    
mex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, msg, 
null);
+                }
+                return;
+            }
+
             process.handleWorkEvent(jobInfo);
         } catch (Exception ex) {
             throw new JobProcessorException(ex, jobInfo.jobDetail.get("inmem") 
== null);

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java 
(original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java 
Wed Sep  3 10:32:51 2008
@@ -421,6 +421,9 @@
 
         OdeRTInstance rti = 
_runtime.newInstance(getState(mexdao.getInstance()));
         BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, 
mexdao.getInstance(), rti);
+        // Canceling invoke check
+        String jobId = mexdao.getProperty("invokeCheckJobId");
+        _contexts.scheduler.cancelJob(jobId);        
 
         brc.injectPartnerResponse(mexdao.getMessageExchangeId(), 
mexdao.getChannel());
         brc.execute();
@@ -498,7 +501,6 @@
                 _contexts.scheduler.jobCompleted(jobInfo.jobName);
                 execInstanceEvent(we);
             }
-
         });
 
     }
@@ -1192,11 +1194,11 @@
 
     }
 
-    public void scheduleWorkEvent(WorkEvent we, Date timeToFire) {
+    public String scheduleWorkEvent(WorkEvent we, Date timeToFire) {
         // if (isInMemory())
         // throw new InvalidProcessException("In-mem process execution 
resulted in event scheduling.");
 
-        _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
+        return _contexts.scheduler.schedulePersistedJob(we.getDetail(), 
timeToFire);
     }
 
     void invokePartner(MessageExchangeDAO mexdao) {
@@ -1233,6 +1235,9 @@
                 }
             } else {
                 partnerRole.invokeIL(mexdao);
+                // Scheduling a verification to see if the invoke has really 
been processed. Otherwise
+                // we put it in activity recovery mode (case of a server crash 
during invocation).
+                scheduleInvokeCheck(mexdao);
             }
         } finally {
             if (mexdao.getStatus() != Status.ACK)
@@ -1243,6 +1248,21 @@
         assert mexdao.getStatus() == Status.ACK || mexdao.getStatus() == 
Status.ASYNC;
     }
 
+    private void scheduleInvokeCheck(MessageExchangeDAO mex) {
+        boolean isTwoWay = mex.getPattern() ==
+                
org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
+        if (!isInMemory() && isTwoWay) {
+            if (__log.isDebugEnabled()) __log.debug("Creating invocation check 
event for mexid " + mex.getMessageExchangeId());
+            WorkEvent event = new WorkEvent();
+            event.setMexId(mex.getMessageExchangeId());
+            event.setProcessId(getPID());
+            event.setType(WorkEvent.Type.INVOKE_CHECK);
+            Date future = new Date(System.currentTimeMillis() + (180 * 1000));
+            String jobId = scheduleWorkEvent(event, future);
+            mex.setProperty("invokeCheckJobId", jobId);
+        }
+    }
+
     /**
      * Invoke a partner process directly (via the engine), bypassing the 
Integration Layer. Obviously this can only be used when an
      * process is partners with another process hosted on the same engine.

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java 
(original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java 
Wed Sep  3 10:32:51 2008
@@ -76,7 +76,9 @@
         /** Invoke a "my role" operation (i.e. implemented by the process). */
         MYROLE_INVOKE, 
         
-        MYROLE_INVOKE_ASYNC_RESPONSE
+        MYROLE_INVOKE_ASYNC_RESPONSE,
+
+        INVOKE_CHECK
     }
 
     public String getChannel() {

Modified: 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
 Wed Sep  3 10:32:51 2008
@@ -25,7 +25,7 @@
 
 /**
  * Like a task, but a little bit better.
- * 
+ *
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  */
 class Job extends Task {
@@ -37,7 +37,7 @@
     public Job(long when, boolean transacted, Map<String, Object> jobDetail) {
         this(when, new GUID().toString(),transacted,jobDetail);
     }
-    
+
     public Job(long when, String jobId, boolean transacted,Map<String, Object> 
jobDetail) {
         super(when);
         this.jobId = jobId;
@@ -45,5 +45,14 @@
         this.transacted = transacted;
     }
 
-    
+    @Override
+    public int hashCode() {
+        return jobId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof Job && jobId.equals(((Job) obj).jobId);
+    }
+
 }

Modified: 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
 Wed Sep  3 10:32:51 2008
@@ -31,152 +31,168 @@
 
 /**
  * Implements the "todo" queue and prioritized scheduling mechanism. 
- * 
+ *
  * @author mszefler
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- * 
+ *
  */
 class SchedulerThread implements Runnable {
 
-       private static final Log __log = 
LogFactory.getLog(SchedulerThread.class);
+    private static final Log __log = LogFactory.getLog(SchedulerThread.class);
 
-       private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
+    private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
 
-       /** Jobs ready for immediate execution. */
-       private PriorityBlockingQueue<Task> _todo;
+    /** Jobs ready for immediate execution. */
+    private PriorityBlockingQueue<Task> _todo;
 
-       /** Lock for managing the queue */
-       private ReentrantLock _lock = new ReentrantLock();
-
-       private Condition _activity = _lock.newCondition();
-
-       private volatile boolean _done;
-
-       private TaskRunner _taskrunner;
-
-       private Thread _thread;
-
-       SchedulerThread(TaskRunner runner) {
-               _todo = new 
PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
-                               new JobComparatorByDate());
-               _taskrunner = runner;
-       }
-
-       void start() {
-               if (_thread != null)
-                       return;
-
-               _done = false;
-               _thread = new Thread(this, "OdeScheduler");
-               _thread.start();
-       }
-
-       /**
-        * Shutdown the thread.
-        */
-       void stop() {
-               if (_thread == null)
-                       return;
-
-               _done = true;
-               _lock.lock();
-               try {
-                       _activity.signal();
-               } finally {
-                       _lock.unlock();
-
-               }
-
-               while (_thread != null)
-                       try {
-                               _thread.join();
-                               _thread = null;
-                       } catch (InterruptedException e) {
-                               ;
-                       }
-
-       }
-
-       /**
-        * Add a job to the todo queue.
-        * 
-        * @param job
-        */
-       void enqueue(Task task) {
-               _lock.lock();
-               try {
-                       _todo.add(task);
-                       _activity.signal();
-               } finally {
-                       _lock.unlock();
-               }
-       }
-
-       /**
-        * Get the size of the todo queue.
-        * 
-        * @return
-        */
-       public int size() {
-               return _todo.size();
-       }
-
-       /**
-        * Pop items off the todo queue, and send them to the task runner for 
processing.
-        */
-       public void run() {
-               while (!_done) {
-                       _lock.lock();
-                       try {
-                               long nextjob;
-                               while ((nextjob = nextJobTime()) > 0 && !_done)
-                                       _activity.await(nextjob, 
TimeUnit.MILLISECONDS);
-
-                               if (!_done && nextjob == 0) {
-                                       Task task = _todo.take();
-                                       _taskrunner.runTask(task);
-
-                               }
-                       } catch (InterruptedException ex) {
-                               ; // ignore
-                       } finally {
-                               _lock.unlock();
-                       }
-               }
-       }
-
-       /**
-        * Calculate the time until the next available job.
-        * 
-        * @return time until next job, 0 if one is one is scheduled to go, and 
some
-        *         really large number if there are no jobs to speak of
-        */
-       private long nextJobTime() {
-               assert _lock.isLocked();
-
-               Task job = _todo.peek();
-               if (job == null)
-                       return Long.MAX_VALUE;
-
-               return Math.max(0, job.schedDate - System.currentTimeMillis());
-       }
-
-       /**
-        * Remove the tasks of a given type from the list. 
-        * @param tasktype type of task
-        */
-       public void clearTasks(final Class<? extends Task> tasktype) {
-               _lock.lock();
-               try {
-                       CollectionsX.remove_if(_todo, new 
MemberOfFunction<Task>() {
-                               @Override
-                               public boolean isMember(Task o) {
-                                       return 
tasktype.isAssignableFrom(o.getClass());
-                               }
-
-                       });
-               } finally {
-                       _lock.unlock();
-               }
-       }
+    /** Lock for managing the queue */
+    private ReentrantLock _lock = new ReentrantLock();
+
+    private Condition _activity = _lock.newCondition();
+
+    private volatile boolean _done;
+
+    private TaskRunner _taskrunner;
+
+    private Thread _thread;
+
+    SchedulerThread(TaskRunner runner) {
+        _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
+                new JobComparatorByDate());
+        _taskrunner = runner;
+    }
+
+    void start() {
+        if (_thread != null)
+            return;
+
+        _done = false;
+        _thread = new Thread(this, "OdeScheduler");
+        _thread.start();
+    }
+
+    /**
+     * Shutdown the thread.
+     */
+    void stop() {
+        if (_thread == null)
+            return;
+
+        _done = true;
+        _lock.lock();
+        try {
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+
+        }
+
+        while (_thread != null)
+            try {
+                _thread.join();
+                _thread = null;
+            } catch (InterruptedException e) {
+                ;
+            }
+
+    }
+
+    /**
+     * Add a job to the todo queue.
+     *
+     * @param job
+     */
+    void enqueue(Task task) {
+        _lock.lock();
+        try {
+            _todo.add(task);
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+        }
+    }
+
+    /**
+     * Remove a job to the todo queue.
+     *
+     * @param job
+     */
+    void dequeue(Task task) {
+        _lock.lock();
+        try {
+            _todo.remove(task);
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+        }
+    }
+
+
+    /**
+     * Get the size of the todo queue.
+     *
+     * @return
+     */
+    public int size() {
+        return _todo.size();
+    }
+
+    /**
+     * Pop items off the todo queue, and send them to the task runner for 
processing.
+     */
+    public void run() {
+        while (!_done) {
+            _lock.lock();
+            try {
+                long nextjob;
+                while ((nextjob = nextJobTime()) > 0 && !_done)
+                    _activity.await(nextjob, TimeUnit.MILLISECONDS);
+
+                if (!_done && nextjob == 0) {
+                    Task task = _todo.take();
+                    _taskrunner.runTask(task);
+
+                }
+            } catch (InterruptedException ex) {
+                ; // ignore
+            } finally {
+                _lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Calculate the time until the next available job.
+     *
+     * @return time until next job, 0 if one is one is scheduled to go, and 
some
+     *         really large number if there are no jobs to speak of
+     */
+    private long nextJobTime() {
+        assert _lock.isLocked();
+
+        Task job = _todo.peek();
+        if (job == null)
+            return Long.MAX_VALUE;
+
+        return Math.max(0, job.schedDate - System.currentTimeMillis());
+    }
+
+    /**
+     * Remove the tasks of a given type from the list.
+     * @param tasktype type of task
+     */
+    public void clearTasks(final Class<? extends Task> tasktype) {
+        _lock.lock();
+        try {
+            CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
+                @Override
+                public boolean isMember(Task o) {
+                    return tasktype.isAssignableFrom(o.getClass());
+                }
+
+            });
+        } finally {
+            _lock.unlock();
+        }
+    }
 }

Modified: 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 Wed Sep  3 10:32:51 2008
@@ -132,7 +132,13 @@
     }
 
     public void cancelJob(String jobId) throws ContextException {
-        // TODO: maybe later, not really necessary.
+        _todo.dequeue(new Job(0, jobId, false, null));
+        try {
+            _db.deleteJob(jobId, _nodeId);
+        } catch (DatabaseException e) {
+            __log.debug("Job removal failed.", e);
+            throw new ContextException("Job removal failed.", e);
+        }
     }
 
     public <T> T execTransaction(Callable<T> transaction) throws Exception, 
ContextException {


Reply via email to