Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HScope.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HScope.java?rev=766592&r1=766591&r2=766592&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HScope.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HScope.java Mon Apr 20 06:44:37 2009 @@ -25,151 +25,148 @@ /** * Hibernate table representing a BPEL scope instance. * @hibernate.class table="BPEL_SCOPE" - * @hibernate.query name="DELETE_SCOPES_BY_INSTANCE" query="delete from HScope as s where s.instance = :instance" - * @hibernate.query name="DELETE_SCOPES_BY_PROCESS" query="delete from HScope as s where s.instance in (select i from HProcessInstance as i where i.process = :process)" + * @hibernate.query name="DELETE_SCOPES_BY_INSTANCES" query="delete from HScope as s where s.instance in (:instances)" */ public class HScope extends HObject{ - public final static String DELETE_SCOPES_BY_INSTANCE = "DELETE_SCOPES_BY_INSTANCE"; - public final static String DELETE_SCOPES_BY_PROCESS = "DELETE_SCOPES_BY_PROCESS"; + public final static String DELETE_SCOPES_BY_INSTANCES = "DELETE_SCOPES_BY_INSTANCES"; - /** Process instance to which this scope belongs. */ - private HProcessInstance _instance; + /** Process instance to which this scope belongs. */ + private HProcessInstance _instance; - /** Correlation set values for csets declared in this scope. */ - private Set<HCorrelationSet> _correlationSets = new HashSet<HCorrelationSet>(); + /** Correlation set values for csets declared in this scope. */ + private Set<HCorrelationSet> _correlationSets = new HashSet<HCorrelationSet>(); - /** Variable values for variables declared in this scope. */ - private Set<HXmlData> _variables = new HashSet<HXmlData>(); - - /** Enpoint References for partner links declared in this scope */ - private Set<HPartnerLink> _partnerLinks = new HashSet<HPartnerLink>(); - - /** Parent scope for this scope. */ - private HScope _parentScope; - - /** State of the scope. */ - private String _state; - - /** Scope type / name. */ - private String _name; - - private int _scopeModelId; - - public HScope() {} - - /** - * @hibernate.set - * lazy="true" - * inverse="true" - * cascade="delete" - * @hibernate.collection-key - * column="SCOPE_ID" foreign-key="none" - * @hibernate.collection-one-to-many - * class="org.apache.ode.daohib.bpel.hobj.HCorrelationSet" - */ - public Set<HCorrelationSet> getCorrelationSets() { - return _correlationSets; - } - - public void setCorrelationSets(Set<HCorrelationSet> correlationSets) { - _correlationSets = correlationSets; - } - - /** - * Get the {...@link HProcessInstance} to which this scope object belongs. - * @hibernate.many-to-one - * column="PIID" foreign-key="none" - */ - public HProcessInstance getInstance() { - return _instance; - } - - /** @see #getInstance() */ - public void setInstance(HProcessInstance instance) { - _instance = instance; - } - - /** - * Get the "parent" {...@link HScope} of this scope. - * @hibernate.many-to-one column="PARENT_SCOPE_ID" foreign-key="none" - */ - public HScope getParentScope() { - return _parentScope; - } - - /** @see #getParentScope() */ - public void setParentScope(HScope parentScope) { - _parentScope = parentScope; - } - - /** - * @hibernate.property column="STATE" not-null="true" - */ - public String getState() { - return _state; - } - - /** @see #getState() */ - public void setState(String state) { - _state = state; - } - - /** - * Get the type (i.e. the name) of the scope. - * @hibernate.property column="NAME" not-null="true" - */ - public String getName() { - return _name; - } - - /** @see #getName() */ - public void setName(String name) { - _name = name; - } - - - /** - * Get the variable values associated with this scope. - * - * @return {...@link Set}<{...@link HXmlData}> with variable values - * @hibernate.set lazy="true" inverse="true" cascade="delete" - * @hibernate.collection-key column="SCOPE_ID" foreign-key="none" - * @hibernate.collection-one-to-many class="org.apache.ode.daohib.bpel.hobj.HXmlData" - */ - public Set<HXmlData> getVariables() { - return _variables; - } - - public void setVariables(Set<HXmlData> variables){ - _variables = variables; - } - - /** - * Get the endpoint references for partner links roles associated with this scope. - * - * @return {...@link Set}<{...@link HPartnerLink}> with variable values - * @hibernate.set lazy="true" inverse="true" cascade="delete" - * @hibernate.collection-key column="SCOPE" foreign-key="none" - * @hibernate.collection-one-to-many class="org.apache.ode.daohib.bpel.hobj.HPartnerLink" - */ - public Set<HPartnerLink> getPartnerLinks() { - return _partnerLinks; - } - - public void setPartnerLinks(Set<HPartnerLink> eprs) { - _partnerLinks = eprs; - } - - /** - * @hibernate.property column="MODELID" - */ - public int getScopeModelId() { - return _scopeModelId; - } - - public void setScopeModelId(int scopeModelId) { - _scopeModelId = scopeModelId; - } + /** Variable values for variables declared in this scope. */ + private Set<HXmlData> _variables = new HashSet<HXmlData>(); + + /** Enpoint References for partner links declared in this scope */ + private Set<HPartnerLink> _partnerLinks = new HashSet<HPartnerLink>(); + + /** Parent scope for this scope. */ + private HScope _parentScope; + + /** State of the scope. */ + private String _state; + + /** Scope type / name. */ + private String _name; + + private int _scopeModelId; + + public HScope() {} + + /** + * @hibernate.set + * lazy="true" + * inverse="true" + * cascade="delete" + * @hibernate.collection-key + * column="SCOPE_ID" foreign-key="none" + * @hibernate.collection-one-to-many + * class="org.apache.ode.daohib.bpel.hobj.HCorrelationSet" + */ + public Set<HCorrelationSet> getCorrelationSets() { + return _correlationSets; + } + + public void setCorrelationSets(Set<HCorrelationSet> correlationSets) { + _correlationSets = correlationSets; + } + + /** + * Get the {...@link HProcessInstance} to which this scope object belongs. + * @hibernate.many-to-one + * column="PIID" foreign-key="none" + */ + public HProcessInstance getInstance() { + return _instance; + } + + /** @see #getInstance() */ + public void setInstance(HProcessInstance instance) { + _instance = instance; + } + + /** + * Get the "parent" {...@link HScope} of this scope. + * @hibernate.many-to-one column="PARENT_SCOPE_ID" foreign-key="none" + */ + public HScope getParentScope() { + return _parentScope; + } + + /** @see #getParentScope() */ + public void setParentScope(HScope parentScope) { + _parentScope = parentScope; + } + + /** + * @hibernate.property column="STATE" not-null="true" + */ + public String getState() { + return _state; + } + + /** @see #getState() */ + public void setState(String state) { + _state = state; + } + + /** + * Get the type (i.e. the name) of the scope. + * @hibernate.property column="NAME" not-null="true" + */ + public String getName() { + return _name; + } + + /** @see #getName() */ + public void setName(String name) { + _name = name; + } + + /** + * Get the variable values associated with this scope. + * + * @return {...@link Set}<{...@link HXmlData}> with variable values + * @hibernate.set lazy="true" inverse="true" cascade="delete" + * @hibernate.collection-key column="SCOPE_ID" foreign-key="none" + * @hibernate.collection-one-to-many class="org.apache.ode.daohib.bpel.hobj.HXmlData" + */ + public Set<HXmlData> getVariables() { + return _variables; + } + + public void setVariables(Set<HXmlData> variables){ + _variables = variables; + } + + /** + * Get the endpoint references for partner links roles associated with this scope. + * + * @return {...@link Set}<{...@link HPartnerLink}> with variable values + * @hibernate.set lazy="true" inverse="true" cascade="delete" + * @hibernate.collection-key column="SCOPE" foreign-key="none" + * @hibernate.collection-one-to-many class="org.apache.ode.daohib.bpel.hobj.HPartnerLink" + */ + public Set<HPartnerLink> getPartnerLinks() { + return _partnerLinks; + } + + public void setPartnerLinks(Set<HPartnerLink> eprs) { + _partnerLinks = eprs; + } + + /** + * @hibernate.property column="MODELID" + */ + public int getScopeModelId() { + return _scopeModelId; + } + + public void setScopeModelId(int scopeModelId) { + _scopeModelId = scopeModelId; + } public String toString() { return "HScope{id="+getId()+",name="+_name+"}";
Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HVariableProperty.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HVariableProperty.java?rev=766592&r1=766591&r2=766592&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HVariableProperty.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HVariableProperty.java Mon Apr 20 06:44:37 2009 @@ -25,65 +25,65 @@ * on indexed lookup of property values. * * @hibernate.class table="VAR_PROPERTY" - * @hibernate.query name="DELETE_VARIABLE_PROPERITES_BY_PROCESS" query="delete from HVariableProperty as p where p.xmlData in(select x.id from HXmlData x where x.instance.process = :process)" - * @hibernate.query name="DELETE_VARIABLE_PROPERITES_BY_INSTANCE" query="delete from HVariableProperty as p where p.xmlData in(select x.id from HXmlData x where x.instance = :instance)" + * @hibernate.query name="DELETE_VARIABLE_PROPERITES_BY_INSTANCES" query="delete from HVariableProperty as p where p.xmlData in(select x.id from HXmlData x where x.instance in (:instances))" */ public class HVariableProperty extends HObject{ - public final static String DELETE_VARIABLE_PROPERITES_BY_PROCESS = "DELETE_VARIABLE_PROPERITES_BY_PROCESS"; - public final static String DELETE_VARIABLE_PROPERITES_BY_INSTANCE = "DELETE_VARIABLE_PROPERITES_BY_INSTANCE"; - - private String _propertyValue; - private String _propertyName; - private HXmlData _variable; - - /** - * - */ - public HVariableProperty() { - super(); - } - - public HVariableProperty(HXmlData var, String name, String value){ - _variable = var; - _propertyName = name; - _propertyValue = value; - } - /** - * @hibernate.many-to-one - * column="XML_DATA_ID" foreign-key="none" - */ - public HXmlData getXmlData(){ - return _variable; - } - - public void setXmlData(HXmlData xmldata){ - _variable = xmldata; - } - - /** - * @hibernate.property - * column="PROP_VALUE" - * index="PROP_VALUE_IDX" - */ - public String getValue() { - return _propertyValue; - } - public void setValue(String value) { - _propertyValue = value; - } - /** - * @hibernate.property - * column="PROP_NAME" - * type="string" - * length="255" - * not-null="true" - * index="PROP_NAME_IDX" - */ - public String getName() { - return _propertyName; - } - public void setName(String name) { - _propertyName = name; - } - + public final static String DELETE_VARIABLE_PROPERITES_BY_INSTANCES = "DELETE_VARIABLE_PROPERITES_BY_INSTANCES"; + + private String _propertyValue; + private String _propertyName; + private HXmlData _variable; + + /** + * Default constructor + */ + public HVariableProperty() { + super(); + } + + public HVariableProperty(HXmlData var, String name, String value){ + _variable = var; + _propertyName = name; + _propertyValue = value; + } + + /** + * @hibernate.many-to-one + * column="XML_DATA_ID" foreign-key="none" + */ + public HXmlData getXmlData(){ + return _variable; + } + + public void setXmlData(HXmlData xmldata){ + _variable = xmldata; + } + + /** + * @hibernate.property + * column="PROP_VALUE" + * index="PROP_VALUE_IDX" + */ + public String getValue() { + return _propertyValue; + } + + public void setValue(String value) { + _propertyValue = value; + } + /** + * @hibernate.property + * column="PROP_NAME" + * type="string" + * length="255" + * not-null="true" + * index="PROP_NAME_IDX" + */ + public String getName() { + return _propertyName; + } + + public void setName(String name) { + _propertyName = name; + } } Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HXmlData.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HXmlData.java?rev=766592&r1=766591&r2=766592&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HXmlData.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HXmlData.java Mon Apr 20 06:44:37 2009 @@ -24,105 +24,102 @@ /** * @hibernate.class table="BPEL_XML_DATA" - * @hibernate.query name="DELETE_XMLDATA_BY_PROCESS" query="delete from HXmlData as x where x.instance in(select i from HProcessInstance as i where i.process = :process)" - * @hibernate.query name="DELETE_XMLDATA_BY_INSTANCE" query="delete from HXmlData as x where x.instance = :instance" + * @hibernate.query name="DELETE_XMLDATA_BY_INSTANCES" query="delete from HXmlData as x where x.instance in (:instances)" */ public class HXmlData extends HObject{ - public static final String DELETE_XMLDATA_BY_PROCESS = "DELETE_XMLDATA_BY_PROCESS"; - public static final String DELETE_XMLDATA_BY_INSTANCE = "DELETE_XMLDATA_BY_INSTANCE"; - - private boolean _simpleType; - private HLargeData _data; - private Collection<HVariableProperty> _properties = new HashSet<HVariableProperty>(); - private String _name; - private HScope _scope; - private HProcessInstance _instance; - - /** Constructor. */ - public HXmlData() { - super(); - } - - /** - * @hibernate.many-to-one column="LDATA_ID" cascade="delete" foreign-key="none" - */ - public HLargeData getData() { - return _data; - } - - public void setData(HLargeData data) { - _data = data; - } - - /** - * @hibernate.property - * column="NAME" - * type="string" - * length="255" - * not-null="true" - */ - public String getName() { - return _name; - } - - public void setName(String name) { - _name = name; - } - - /** - * @hibernate.bag - * lazy="true" - * inverse="true" - * cascade="delete" - * @hibernate.collection-key column="XML_DATA_ID" foreign-key="none" - * @hibernate.collection-one-to-many - * class="org.apache.ode.daohib.bpel.hobj.HVariableProperty" - */ - public Collection<HVariableProperty> getProperties() { - return _properties; - } - - public void setProperties(Collection<HVariableProperty> properties) { - _properties = properties; - } - - /** - * @hibernate.many-to-one column="SCOPE_ID" foreign-key="none" - */ - public HScope getScope() { - return _scope; - } - - public void setScope(HScope scope) { - _scope = scope; - - if(scope != null) { - setInstance(scope.getInstance()); - } - } - - /** - * @hibernate.many-to-one - * column="PIID" foreign-key="none" - */ - public HProcessInstance getInstance() { - return _instance; - } - - public void setInstance(HProcessInstance instance) { - _instance = instance; - } - - /** - * @hibernate.property - * column="IS_SIMPLE_TYPE" - */ - public boolean isSimpleType() { - return _simpleType; - } - - public void setSimpleType(boolean simpleType) { - _simpleType = simpleType; - } + public static final String DELETE_XMLDATA_BY_INSTANCES = "DELETE_XMLDATA_BY_INSTANCES"; + private boolean _simpleType; + private HLargeData _data; + private Collection<HVariableProperty> _properties = new HashSet<HVariableProperty>(); + private String _name; + private HScope _scope; + private HProcessInstance _instance; + + /** Constructor. */ + public HXmlData() { + super(); + } + + /** + * @hibernate.many-to-one column="LDATA_ID" cascade="delete" foreign-key="none" + */ + public HLargeData getData() { + return _data; + } + + public void setData(HLargeData data) { + _data = data; + } + + /** + * @hibernate.property + * column="NAME" + * type="string" + * length="255" + * not-null="true" + */ + public String getName() { + return _name; + } + + public void setName(String name) { + _name = name; + } + + /** + * @hibernate.bag + * lazy="true" + * inverse="true" + * cascade="delete" + * @hibernate.collection-key column="XML_DATA_ID" foreign-key="none" + * @hibernate.collection-one-to-many + * class="org.apache.ode.daohib.bpel.hobj.HVariableProperty" + */ + public Collection<HVariableProperty> getProperties() { + return _properties; + } + + public void setProperties(Collection<HVariableProperty> properties) { + _properties = properties; + } + + /** + * @hibernate.many-to-one column="SCOPE_ID" foreign-key="none" + */ + public HScope getScope() { + return _scope; + } + + public void setScope(HScope scope) { + _scope = scope; + + if(scope != null) { + setInstance(scope.getInstance()); + } + } + + /** + * @hibernate.many-to-one + * column="PIID" foreign-key="none" + */ + public HProcessInstance getInstance() { + return _instance; + } + + public void setInstance(HProcessInstance instance) { + _instance = instance; + } + + /** + * @hibernate.property + * column="IS_SIMPLE_TYPE" + */ + public boolean isSimpleType() { + return _simpleType; + } + + public void setSimpleType(boolean simpleType) { + _simpleType = simpleType; + } } Modified: ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java?rev=766592&r1=766591&r2=766592&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java Mon Apr 20 06:44:37 2009 @@ -19,6 +19,7 @@ package org.apache.ode.dao.jpa; +import java.io.Serializable; import java.sql.Timestamp; import java.text.ParseException; import java.util.ArrayList; @@ -56,7 +57,6 @@ * @author Matthieu Riou <mriou at apache dot org> */ public class BPELDAOConnectionImpl implements BpelDAOConnection { - static final Log __log = LogFactory.getLog(BPELDAOConnectionImpl.class); protected EntityManager _em; @@ -97,6 +97,13 @@ _em.persist(ret); return ret; } + + public ProcessDAO createTransientProcess(Serializable id) { + ProcessDAOImpl ret = new ProcessDAOImpl(null, null, null, 0); + ret.setId((Long)id); + + return ret; + } @SuppressWarnings("unchecked") public ProcessDAO getProcess(QName processId) { @@ -345,11 +352,11 @@ return map; } + @SuppressWarnings("unchecked") public Collection<CorrelationSetDAO> getActiveCorrelationSets() { return _em.createNamedQuery(CorrelationSetDAOImpl.SELECT_ACTIVE_SETS).setParameter("state", ProcessState.STATE_ACTIVE).getResultList(); } - public ProcessManagementDAO getProcessManagement() { return new ProcessManagementDAOImpl(_em); } Modified: ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java?rev=766592&r1=766591&r2=766592&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java Mon Apr 20 06:44:37 2009 @@ -30,6 +30,8 @@ import javax.persistence.*; import javax.xml.namespace.QName; + +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -45,11 +47,10 @@ @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process") }) public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO { - private static final Log __log = LogFactory.getLog(ProcessDAOImpl.class); - + private static final Log __log = LogFactory.getLog(ProcessDAOImpl.class); + @Id @Column(name="ID") @GeneratedValue(strategy= GenerationType.AUTO) - @SuppressWarnings("unused") private Long _id; @Basic @Column(name="PROCESS_ID") @@ -61,20 +62,28 @@ @Basic @Column(name="VERSION") private long _version; - @OneToMany(targetEntity=CorrelatorDAOImpl.class,mappedBy="_process",fetch=FetchType.LAZY,cascade={CascadeType.ALL}) + @OneToMany(targetEntity=CorrelatorDAOImpl.class,mappedBy="_process",fetch=FetchType.LAZY,cascade={CascadeType.ALL}) private Collection<CorrelatorDAOImpl> _correlators = new ArrayList<CorrelatorDAOImpl>(); - public ProcessDAOImpl() {} - public ProcessDAOImpl(QName pid, QName type, String guid, long version) { + public ProcessDAOImpl() {} + public ProcessDAOImpl(QName pid, QName type, String guid, long version) { _processId = pid.toString(); - _processType = type.toString(); - _guid = guid; + _processType = type.toString(); + _guid = guid; _version = version; } - - public CorrelatorDAO addCorrelator(String correlator) { - CorrelatorDAOImpl corr = new CorrelatorDAOImpl(correlator, this); - _correlators.add(corr); + + public Serializable getId() { + return _id; + } + + public void setId(Long id) { + _id = id; + } + + public CorrelatorDAO addCorrelator(String correlator) { + CorrelatorDAOImpl corr = new CorrelatorDAOImpl(correlator, this); + _correlators.add(corr); return corr; } @@ -88,99 +97,108 @@ return (CorrelatorDAO) res.get(0); } - public ProcessInstanceDAO createInstance( - CorrelatorDAO instantiatingCorrelator) { - ProcessInstanceDAOImpl inst = new ProcessInstanceDAOImpl((CorrelatorDAOImpl)instantiatingCorrelator, this); - getEM().persist(inst); - return inst; + public ProcessInstanceDAO createInstance(CorrelatorDAO instantiatingCorrelator) { + ProcessInstanceDAOImpl inst = new ProcessInstanceDAOImpl((CorrelatorDAOImpl)instantiatingCorrelator, this); + getEM().persist(inst); + return inst; } - public ProcessInstanceDAO createInstance( - CorrelatorDAO instantiatingCorrelator, MessageExchangeDAO mex) { - ProcessInstanceDAOImpl inst = new ProcessInstanceDAOImpl((CorrelatorDAOImpl)instantiatingCorrelator, this); - getEM().persist(inst); - return inst; + public ProcessInstanceDAO createInstance(CorrelatorDAO instantiatingCorrelator, MessageExchangeDAO mex) { + ProcessInstanceDAOImpl inst = new ProcessInstanceDAOImpl((CorrelatorDAOImpl)instantiatingCorrelator, this); + getEM().persist(inst); + return inst; } @SuppressWarnings("unchecked") public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey) { - Query qry = getEM().createNamedQuery("InstanceByCKey"); + Query qry = getEM().createNamedQuery("InstanceByCKey"); qry.setParameter("ckey", ckey.toCanonicalString()); return qry.getResultList(); - } + } - public ProcessInstanceDAO getInstance(Long iid) { - return getEM().find(ProcessInstanceDAOImpl.class, iid); - } + public ProcessInstanceDAO getInstance(Long iid) { + return getEM().find(ProcessInstanceDAOImpl.class, iid); + } - public QName getProcessId() { - return QName.valueOf(_processId); - } + public QName getProcessId() { + return QName.valueOf(_processId); + } - public QName getType() { - return QName.valueOf(_processType); - } + public QName getType() { + return QName.valueOf(_processType); + } - public void delete() { - if(__log.isDebugEnabled()) __log.debug("Cleaning up process data."); + @SuppressWarnings("unchecked") + public void deleteProcessAndRoutes() { + // delete routes + Collection instanceIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(instanceIds.iterator(), getEM().createNamedQuery(MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS), "instanceIds"); + getEM().createNamedQuery(CorrelatorDAOImpl.DELETE_CORRELATORS_BY_PROCESS).setParameter("process", this).executeUpdate(); - deleteEvents(); - deleteCorrelations(); - deleteMessages(); - deleteVariables(); - deleteProcessInstances(); + deleteInstances(Integer.MAX_VALUE); + + // delete process dao getEM().remove(this); // This deletes CorrelatorDAO getEM().flush(); } + + private int deleteInstances(int transactionSize) { + if(__log.isDebugEnabled()) __log.debug("Cleaning up process data."); + + deleteEvents(); + deleteCorrelations(); + deleteMessages(); + deleteVariables(); + deleteProcessInstances(); + + return 0; + } @SuppressWarnings("unchecked") private void deleteProcessInstances() { - Collection faultIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_FAULT_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(faultIds.iterator(), getEM().createNamedQuery(FaultDAOImpl.DELETE_FAULTS_BY_IDS), "ids"); - Collection instanceIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(instanceIds.iterator(), getEM().createNamedQuery(ActivityRecoveryDAOImpl.DELETE_ACTIVITY_RECOVERIES_BY_IDS), "ids"); - getEM().createNamedQuery(ProcessInstanceDAOImpl.DELETE_INSTANCES_BY_PROCESS).setParameter("process", this).executeUpdate(); + Collection faultIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_FAULT_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(faultIds.iterator(), getEM().createNamedQuery(FaultDAOImpl.DELETE_FAULTS_BY_IDS), "ids"); + Collection instanceIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(instanceIds.iterator(), getEM().createNamedQuery(ActivityRecoveryDAOImpl.DELETE_ACTIVITY_RECOVERIES_BY_IDS), "ids"); + getEM().createNamedQuery(ProcessInstanceDAOImpl.DELETE_INSTANCES_BY_PROCESS).setParameter("process", this).executeUpdate(); } @SuppressWarnings("unchecked") private void deleteVariables() { - Collection xmlDataIds = getEM().createNamedQuery(XmlDataDAOImpl.SELECT_XMLDATA_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(xmlDataIds.iterator(), getEM().createNamedQuery(XmlDataProperty.DELETE_XML_DATA_PROPERTIES_BY_XML_DATA_IDS), "xmlDataIds"); - Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(XmlDataDAOImpl.DELETE_XMLDATA_BY_SCOPE_IDS), "scopeIds"); - -// Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS), "scopeIds"); - batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(ScopeDAOImpl.DELETE_SCOPES_BY_SCOPE_IDS), "ids"); + Collection xmlDataIds = getEM().createNamedQuery(XmlDataDAOImpl.SELECT_XMLDATA_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(xmlDataIds.iterator(), getEM().createNamedQuery(XmlDataProperty.DELETE_XML_DATA_PROPERTIES_BY_XML_DATA_IDS), "xmlDataIds"); + Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(XmlDataDAOImpl.DELETE_XMLDATA_BY_SCOPE_IDS), "scopeIds"); + +// Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS), "scopeIds"); + batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(ScopeDAOImpl.DELETE_SCOPES_BY_SCOPE_IDS), "ids"); } @SuppressWarnings("unchecked") - private void deleteMessages() { - getEM().createNamedQuery(MessageDAOImpl.DELETE_MESSAGES_BY_PROCESS).setParameter("process", this).executeUpdate(); - Collection mexIds = getEM().createNamedQuery(MessageExchangeDAOImpl.SELECT_MEX_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(mexIds.iterator(), getEM().createNamedQuery(MexProperty.DELETE_MEX_PROPERTIES_BY_MEX_IDS), "mexIds"); - getEM().createNamedQuery(MessageExchangeDAOImpl.DELETE_MEXS_BY_PROCESS).setParameter("process", this).executeUpdate(); - Collection instanceIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(instanceIds.iterator(), getEM().createNamedQuery(MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS), "instanceIds"); - getEM().createNamedQuery(CorrelatorDAOImpl.DELETE_CORRELATORS_BY_PROCESS).setParameter("process", this).executeUpdate(); - } + private void deleteMessages() { + getEM().createNamedQuery(MessageDAOImpl.DELETE_MESSAGES_BY_PROCESS).setParameter("process", this).executeUpdate(); + Collection mexIds = getEM().createNamedQuery(MessageExchangeDAOImpl.SELECT_MEX_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(mexIds.iterator(), getEM().createNamedQuery(MexProperty.DELETE_MEX_PROPERTIES_BY_MEX_IDS), "mexIds"); + getEM().createNamedQuery(MessageExchangeDAOImpl.DELETE_MEXS_BY_PROCESS).setParameter("process", this).executeUpdate(); + } @SuppressWarnings("unchecked") private void deleteCorrelations() { - Collection corrSetIds = getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrSetProperty.DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS), "corrSetIds"); - batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrelationSetDAOImpl.DELETE_CORRELATION_SETS_BY_IDS), "ids"); + Collection corrSetIds = getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrSetProperty.DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS), "corrSetIds"); + batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrelationSetDAOImpl.DELETE_CORRELATION_SETS_BY_IDS), "ids"); } @SuppressWarnings("unchecked") private void deleteEvents() { - Collection eventIds = getEM().createNamedQuery(EventDAOImpl.SELECT_EVENT_IDS_BY_PROCESS).setParameter("process", this).getResultList(); - batchUpdateByIds(eventIds.iterator(), getEM().createNamedQuery(EventDAOImpl.DELETE_EVENTS_BY_IDS), "ids"); - } + Collection eventIds = getEM().createNamedQuery(EventDAOImpl.SELECT_EVENT_IDS_BY_PROCESS).setParameter("process", this).getResultList(); + batchUpdateByIds(eventIds.iterator(), getEM().createNamedQuery(EventDAOImpl.DELETE_EVENTS_BY_IDS), "ids"); + } public int getNumInstances() { - Long instanceCount = (Long) getSingleResult(getEM().createNamedQuery(ProcessInstanceDAOImpl.COUNT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this)); - return (instanceCount == null ? 0 : instanceCount.intValue()); + Long instanceCount = (Long) getSingleResult(getEM().createNamedQuery(ProcessInstanceDAOImpl.COUNT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this)); + return (instanceCount == null ? 0 : instanceCount.intValue()); } public long getVersion() { @@ -201,6 +219,7 @@ return _guid; } + @SuppressWarnings("unchecked") public Collection<ProcessInstanceDAO> getActiveInstances() { Query qry = getEM().createNamedQuery("ActiveInstances"); qry.setParameter("process", this); Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=766592&r1=766591&r2=766592&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original) +++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Mon Apr 20 06:44:37 2009 @@ -96,6 +96,8 @@ /** The object that actually handles the jobs. */ volatile JobProcessor _jobProcessor; + volatile JobProcessor _polledRunnableProcessor; + private SchedulerThread _todo; private DatabaseDelegate _db; @@ -168,6 +170,10 @@ _exec = executorService; } + public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) { + _polledRunnableProcessor = polledRunnableProcessor; + } + public void cancelJob(String jobId) throws ContextException { _todo.dequeue(new Job(0, jobId, false, null)); try { @@ -200,6 +206,7 @@ String errmsg = "Internal Error, could not begin transaction."; throw new ContextException(errmsg, ex); } + boolean success = false; try { T retval = transaction.call(); @@ -209,10 +216,10 @@ throw ex; } finally { if (success) { - if (__log.isDebugEnabled()) __log.debug("Commiting..."); + if (__log.isDebugEnabled()) __log.debug("Commiting on " + _txm + "..."); _txm.commit(); } else { - if (__log.isDebugEnabled()) __log.debug("Rollbacking..."); + if (__log.isDebugEnabled()) __log.debug("Rollbacking on " + _txm + "..."); _txm.rollback(); } } @@ -248,11 +255,27 @@ if (__log.isDebugEnabled()) __log.debug("scheduling " + jobDetail + " for " + when); - boolean immediate = when.getTime() <= ctime + _immediateInterval; - boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval; + return schedulePersistedJob(new Job(when.getTime(), true, jobDetail), when, ctime); + } + + public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException { + long ctime = System.currentTimeMillis(); + if (when == null) + when = new Date(ctime); + + Map<String, Object> jobDetails = new HashMap<String, Object>(); + jobDetails.put("runnable", runnable); + runnable.storeToDetailsMap(jobDetails); + + if (__log.isDebugEnabled()) + __log.debug("scheduling " + jobDetails + " for " + when); - Job job = new Job(when.getTime(), true, jobDetail); + return schedulePersistedJob(new Job(when.getTime(), true, jobDetails), when, ctime); + } + private String schedulePersistedJob(Job job, Date when, long ctime) throws ContextException { + boolean immediate = when.getTime() <= ctime + _immediateInterval; + boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval; try { if (immediate) { // Immediate scheduling means we put it in the DB for safe keeping @@ -362,8 +385,7 @@ /** * Run a job in the current thread. * - * @param job - * job to run. + * @param job job to run. */ protected void runJob(final Job job) { final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, @@ -414,9 +436,79 @@ }); } + /** + * Run a job from a polled runnable thread. The runnable is not persistent, + * however, the poller is persistent and wakes up every given interval to + * check the status of the runnable. + * <ul> + * <li>1. The runnable is being scheduled; the poller persistent job dispatches + * the runnable to a runnable delegate thread and schedules itself to a later time.</li> + * <li>2. The runnable is running; the poller job re-schedules itself every time it + * sees the runnable is not completed.</li> + * <li>3. The runnable failed; the poller job passes the exception thrown on the runnable + * down, and the standard scheduler retries happen.</li> + * <li>4. The runnable completes; the poller persistent does not re-schedule itself.</li> + * <li>5. System powered off and restarts; the poller job does not know what the status + * of the runnable. This is handled just like the case #1.</li> + * </ul> + * + * There is at least one re-scheduling of the poller job. Since, the runnable's state is + * not persisted, and the same runnable may be tried again after system failure, + * the runnable that's used with this polling should be repeatable. + * + * @param job job to run. + */ + protected void runPolledRunnable(final Job job) { + final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, + (Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : 0)); + + _exec.submit(new Callable<Void>() { + public Void call() throws Exception { + try { + execTransaction(new Callable<Void>() { + public Void call() throws Exception { + if (!_db.deleteJob(job.jobId, _nodeId)) + throw new JobNoLongerInDbException(job.jobId,_nodeId); + + try { + _polledRunnableProcessor.onScheduledJob(jobInfo); + if( !"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status"))) ) { + // the runnable is still in progress, schedule checker to 10 mins later + job.schedDate = System.currentTimeMillis() + 10 * 60 * 1000; + _db.insertJob(job, _nodeId, false); + } + } catch (JobProcessorException jpe) { + if (jpe.retry) { + int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0; + if (retry <= 10) { + long delay = doRetry(job); + __log.error("Error while processing transaction, retrying in " + delay + "s"); + } else { + __log.error("Error while processing transaction after 10 retries, no more retries:"+job); + } + } else { + __log.error("Error while processing transaction, no retry.", jpe); + } + // Let execTransaction know that shit happened. + throw jpe; + } + return null; + } + }); + } catch (JobNoLongerInDbException jde) { + // This may happen if two node try to do the same job... we try to avoid + // it the synchronization is a best-effort but not perfect. + __log.debug("job no longer in db forced rollback."); + } catch (Exception ex) { + __log.error("Error while executing transaction", ex); + } + return null; + } + }); + } + private void addTodoOnCommit(final Job job) { registerSynchronizer(new Synchronizer() { - public void afterCompletion(boolean success) { if (success) { _todo.enqueue(job); @@ -425,7 +517,6 @@ public void beforeCompletion() { } - }); } @@ -439,9 +530,14 @@ } public void runTask(Task task) { - if (task instanceof Job) - runJob((Job) task); - if (task instanceof SchedulerTask) + if (task instanceof Job) { + Job job = (Job)task; + if( job.detail.get("runnable") != null ) { + runPolledRunnable(job); + } else { + runJob((Job) task); + } + } else if (task instanceof SchedulerTask) ((SchedulerTask) task).run(); } @@ -530,11 +626,9 @@ __log.debug("recovering stale node " + nodeId); try { int numrows = execTransaction(new Callable<Integer>() { - public Integer call() throws Exception { return _db.updateReassign(nodeId, _nodeId); } - }); __log.debug("reassigned " + numrows + " jobs to self. "); @@ -571,7 +665,6 @@ } private class LoadImmediateTask extends SchedulerTask { - LoadImmediateTask(long schedDate) { super(schedDate); } @@ -596,7 +689,6 @@ * */ private class UpgradeJobsTask extends SchedulerTask { - UpgradeJobsTask(long schedDate) { super(schedDate); } @@ -624,14 +716,12 @@ __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms"); } } - } /** * Check if any of the nodes in our cluster are stale. */ private class CheckStaleNodes extends SchedulerTask { - CheckStaleNodes(long schedDate) { super(schedDate); } @@ -648,9 +738,5 @@ } } } - - } - - }
