Added: ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test1.properties URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test1.properties?rev=709298&view=auto ============================================================================== --- ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test1.properties (added) +++ ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test1.properties Thu Oct 30 16:58:51 2008 @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +namespace=http://ode/bpel/unit-test.wsdl +service=HelloService +operation=hello +request1=<message><TestPart><content>Hello</content></TestPart></message> +response1=.*Hello World.* +
Added: ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test2.properties URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test2.properties?rev=709298&view=auto ============================================================================== --- ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test2.properties (added) +++ ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test2.properties Thu Oct 30 16:58:51 2008 @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +namespace=http://ode/bpel/unit-test.wsdl +service=HelloService +operation=hello +request1=<message><TestPart><content>Hello</content></TestPart></message> +response1=.*Hello World.* + Added: ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test3.properties URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test3.properties?rev=709298&view=auto ============================================================================== --- ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test3.properties (added) +++ ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test3.properties Thu Oct 30 16:58:51 2008 @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +namespace=http://ode/bpel/unit-test.wsdl +service=HelloService +operation=ping +request1=<message><TestPart><content>Hello</content></TestPart></message> + Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java?rev=709298&r1=709297&r2=709298&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java Thu Oct 30 16:58:51 2008 @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.Iterator; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,10 +56,10 @@ /** filter for finding a matching selector. */ private static final String FLTR_SELECTORS = ("from " + HCorrelatorSelector.class.getName() - + " hs where hs.correlationKey = ? and hs.processType = ? and hs.correlator.correlatorId = ?").intern(); + + " hs where hs.correlationKey like ? and hs.processType = ? and hs.correlator.correlatorId = ?").intern(); private static final String LOCK_SELECTORS = "update from " + HCorrelatorSelector.class.getName() + - " set lock = lock+1 where correlationKey = ? and processType = ?".intern(); + " set lock = lock+1 where correlationKey like ? and processType = ?".intern(); /** Query for removing routes. */ private static final String QRY_DELSELECTORS = "delete from " + HCorrelatorSelector.class.getName() @@ -104,7 +105,9 @@ } } - public MessageRouteDAO findRoute(CorrelationKey key) { + public List<MessageRouteDAO> findRoute(CorrelationKey key) { + List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>(); + entering("CorrelatorDaoImpl.findRoute"); String hdr = "findRoute(key=" + key + "): "; if (__log.isDebugEnabled()) @@ -117,19 +120,30 @@ // is a much safer alternative. String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString(); Query lockQry = getSession().createQuery(LOCK_SELECTORS); - lockQry.setString(0, key == null ? null : key.toCanonicalString()); + lockQry.setString(0, key == null ? "%" : key.toCanonicalString()); lockQry.setString(1, processType); if (lockQry.executeUpdate() > 0) { Query q = getSession().createQuery(FLTR_SELECTORS); - q.setString(0, key == null ? null : key.toCanonicalString()); + q.setString(0, key == null ? "%" : key.toCanonicalString()); q.setString(1, processType); q.setString(2, _hobj.getCorrelatorId()); q.setLockMode("hs", LockMode.UPGRADE); HCorrelatorSelector selector; try { - selector = (HCorrelatorSelector) q.uniqueResult(); + List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); + Iterator selectors = q.iterate(); + while (selectors.hasNext()) { + selector = (HCorrelatorSelector) selectors.next(); + if (selector != null) { + if ("all".equals(selector.getRoute()) || + ("one".equals(selector.getRoute()) && !targets.contains(selector.getInstance()))) { + routes.add(new MessageRouteDaoImpl(_sm, selector)); + targets.add(selector.getInstance()); + } + } + } } catch (Exception ex) { __log.debug("Strange, could not get a unique result for findRoute, trying to iterate instead."); @@ -139,8 +153,8 @@ Hibernate.close(i); } - __log.debug(hdr + "found " + selector); - return selector == null ? null : new MessageRouteDaoImpl(_sm, selector); + __log.debug(hdr + "found " + routes); + return routes; } return null; @@ -177,7 +191,7 @@ return ret; } - public void addRoute(String routeGroupId, ProcessInstanceDAO target, int idx, CorrelationKey correlationKey) { + public void addRoute(String routeGroupId, ProcessInstanceDAO target, int idx, CorrelationKey correlationKey, String routePolicy) { entering("CorrelatorDaoImpl.addRoute"); String hdr = "addRoute(" + routeGroupId + ", iid=" + target.getInstanceId() + ", idx=" + idx + ", ckey=" + correlationKey + "): "; @@ -192,6 +206,7 @@ hsel.setProcessType(target.getProcess().getType().toString()); hsel.setCorrelator(_hobj); hsel.setCreated(new Date()); + hsel.setRoutePolicy(routePolicy); // _hobj.addSelector(hsel); getSession().save(hsel); Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java?rev=709298&r1=709297&r2=709298&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java Thu Oct 30 16:58:51 2008 @@ -337,6 +337,19 @@ _hself.setPipedMessageExchangeId(mexId); } + + public int getSubscriberCount() { + return _hself.getSubscriberCount(); + } + + public void setSubscriberCount(int subscriberCount) { + _hself.setSubscriberCount(subscriberCount); + } + + public void incrementSubscriberCount() { + _hself.incrementSubscriberCount(); + } + public void release() { // no-op for now, could be used to do some cleanup } Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageRouteDaoImpl.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageRouteDaoImpl.java?rev=709298&r1=709297&r2=709298&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageRouteDaoImpl.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageRouteDaoImpl.java Thu Oct 30 16:58:51 2008 @@ -69,5 +69,9 @@ entering("MessageRouteDaoImpl.getIndex"); return _selector.getIndex(); } + + public String getRoute() { + return _selector.getRoute(); + } } Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java?rev=709298&r1=709297&r2=709298&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java Thu Oct 30 16:58:51 2008 @@ -31,6 +31,7 @@ private HCorrelator _correlator; private String _correlationKey; private String _processType; + private String _routePolicy; /** * @hibernate.many-to-one column="PIID" not-null="true" @@ -101,6 +102,17 @@ } /** + * @hibernate.property column="ROUTE_POLICY" not-null="true" + */ + public String getRoute() { + return _routePolicy; + } + + public void setRoutePolicy(String _routePolicy) { + this._routePolicy = _routePolicy; + } + + /** * @hibernate.many-to-one not-null="true" * @hibernate.column name="CORRELATOR" not-null="true" * index="IDX_SELECTOR_CORRELATOR" unique-key="UNIQ_SELECTOR" Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java?rev=709298&r1=709297&r2=709298&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java Thu Oct 30 16:58:51 2008 @@ -71,6 +71,8 @@ private String _callee; private String _pipedMessageExchangeId; + + private int _subscriberCount; private Map<String, String> _properties = new HashMap<String, String>(); @@ -334,4 +336,16 @@ public void setPipedMessageExchangeId(String pipedMessageExchangeId) { _pipedMessageExchangeId = pipedMessageExchangeId; } + + public int getSubscriberCount() { + return _subscriberCount; + } + + public void setSubscriberCount(int subscriberCount) { + this._subscriberCount = subscriberCount; + } + + public void incrementSubscriberCount() { + _subscriberCount++; + } } Modified: ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java?rev=709298&r1=709297&r2=709298&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java Thu Oct 30 16:58:51 2008 @@ -36,7 +36,7 @@ @NamedQueries({ @NamedQuery(name="RouteByCKey", query="SELECT route " + "FROM MessageRouteDAOImpl as route " + - "WHERE route._correlationKey = :ckey " + + "WHERE route._correlationKey like :ckey " + "and route._correlator._process._processType = :ptype " + "and route._correlator._correlatorKey = :corrkey") }) @@ -60,9 +60,9 @@ _process = process; } - public void addRoute(String routeGroupId, ProcessInstanceDAO target, int index, CorrelationKey correlationKey) { + public void addRoute(String routeGroupId, ProcessInstanceDAO target, int index, CorrelationKey correlationKey, String routePolicy) { MessageRouteDAOImpl mr = new MessageRouteDAOImpl(correlationKey, - routeGroupId, index, (ProcessInstanceDAOImpl) target, this); + routeGroupId, index, (ProcessInstanceDAOImpl) target, this, routePolicy); _routes.add(mr); } @@ -88,15 +88,27 @@ } - public MessageRouteDAO findRoute(CorrelationKey correlationKey) { + public List<MessageRouteDAO> findRoute(CorrelationKey correlationKey) { Query qry = getEM().createNamedQuery("RouteByCKey"); - qry.setParameter("ckey", correlationKey.toCanonicalString()); + qry.setParameter("ckey", correlationKey == null ? "%" : correlationKey.toCanonicalString()); qry.setParameter("ptype", _process.getType().toString()); qry.setParameter("corrkey", _correlatorKey); List<MessageRouteDAO> routes = (List<MessageRouteDAO>) qry.getResultList(); if (routes.size() > 0) { - return routes.get(0); - } else return null; + List<ProcessInstanceDAO> targets = new ArrayList<ProcessInstanceDAO>(); + for (int i = 0; i < routes.size(); i++) { + MessageRouteDAO route = routes.get(i); + if ("all".equals(route.getRoute()) || + ("one".equals(route.getRoute()) && !targets.contains(route.getTargetInstance()))) { + targets.add(route.getTargetInstance()); + } else { + routes.remove(i); + } + } + return routes; + } else { + return null; + } } public String getCorrelatorId() { Modified: ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?rev=709298&r1=709297&r2=709298&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java Thu Oct 30 16:58:51 2008 @@ -91,6 +91,8 @@ private String _correlationKeys; @Basic @Column(name="PIPED_ID") private String _pipedMessageExchangeId; + @Basic @Column(name="SUBSCRIBER_COUNT") + private int _subscriberCount; @OneToMany(targetEntity=MexProperty.class,mappedBy="_mex",fetch=FetchType.EAGER,cascade={CascadeType.ALL}) private Collection<MexProperty> _props = new ArrayList<MexProperty>(); @@ -329,11 +331,6 @@ return correlationKeys; } - - public void release() { - // no-op for now, could be used to do some cleanup - } - public CorrelatorDAOImpl getCorrelator() { return _correlator; } @@ -341,4 +338,21 @@ public void setCorrelator(CorrelatorDAOImpl correlator) { _correlator = correlator; } + + public int getSubscriberCount() { + return _subscriberCount; + } + + public void setSubscriberCount(int subscriberCount) { + this._subscriberCount = subscriberCount; + } + + public void incrementSubscriberCount() { + ++_subscriberCount; + } + + public void release() { + // no-op for now, could be used to do some cleanup + } + } Modified: ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageRouteDAOImpl.java URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageRouteDAOImpl.java?rev=709298&r1=709297&r2=709298&view=diff ============================================================================== --- ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageRouteDAOImpl.java (original) +++ ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageRouteDAOImpl.java Thu Oct 30 16:58:51 2008 @@ -47,6 +47,8 @@ private int _index; @Basic @Column(name="CORRELATION_KEY") private String _correlationKey; + @Basic @Column(name="ROUTE_POLICY") + private String _routePolicy; @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="PROCESS_INSTANCE_ID") private ProcessInstanceDAOImpl _processInst; @@ -55,12 +57,13 @@ public MessageRouteDAOImpl() {} public MessageRouteDAOImpl(CorrelationKey key, String groupId, int index, - ProcessInstanceDAOImpl processInst, CorrelatorDAOImpl correlator) { + ProcessInstanceDAOImpl processInst, CorrelatorDAOImpl correlator, String routePolicy) { _correlationKey = key.toCanonicalString(); _groupId = groupId; _index = index; _processInst = processInst; _correlator = correlator; + _routePolicy = routePolicy; } public Long getId() { @@ -82,5 +85,9 @@ public ProcessInstanceDAO getTargetInstance() { return _processInst; } + + public String getRoute() { + return _routePolicy; + } }
