Added: 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test1.properties
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test1.properties?rev=709298&view=auto
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test1.properties
 (added)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test1.properties
 Thu Oct 30 16:58:51 2008
@@ -0,0 +1,23 @@
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+namespace=http://ode/bpel/unit-test.wsdl
+service=HelloService
+operation=hello
+request1=<message><TestPart><content>Hello</content></TestPart></message>
+response1=.*Hello World.*
+

Added: 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test2.properties
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test2.properties?rev=709298&view=auto
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test2.properties
 (added)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test2.properties
 Thu Oct 30 16:58:51 2008
@@ -0,0 +1,23 @@
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+namespace=http://ode/bpel/unit-test.wsdl
+service=HelloService
+operation=hello
+request1=<message><TestPart><content>Hello</content></TestPart></message>
+response1=.*Hello World.*
+

Added: 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test3.properties
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test3.properties?rev=709298&view=auto
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test3.properties
 (added)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-test/src/test/resources/bpel/2.0/TestPubSubOutOfProc/test3.properties
 Thu Oct 30 16:58:51 2008
@@ -0,0 +1,22 @@
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+namespace=http://ode/bpel/unit-test.wsdl
+service=HelloService
+operation=ping
+request1=<message><TestPart><content>Hello</content></TestPart></message>
+

Modified: 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java?rev=709298&r1=709297&r2=709298&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
 Thu Oct 30 16:58:51 2008
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,10 +56,10 @@
 
     /** filter for finding a matching selector. */
     private static final String FLTR_SELECTORS = ("from " + 
HCorrelatorSelector.class.getName()
-            + " hs where hs.correlationKey = ? and hs.processType = ? and 
hs.correlator.correlatorId = ?").intern();
+            + " hs where hs.correlationKey like ? and hs.processType = ? and 
hs.correlator.correlatorId = ?").intern();
 
     private static final String LOCK_SELECTORS = "update from " + 
HCorrelatorSelector.class.getName() +
-        " set lock = lock+1 where correlationKey = ? and processType = 
?".intern();
+        " set lock = lock+1 where correlationKey like ? and processType = 
?".intern();
     
     /** Query for removing routes. */
     private static final String QRY_DELSELECTORS = "delete from " + 
HCorrelatorSelector.class.getName()
@@ -104,7 +105,9 @@
         }
     }
 
-    public MessageRouteDAO findRoute(CorrelationKey key) {
+    public List<MessageRouteDAO> findRoute(CorrelationKey key) {
+       List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+       
         entering("CorrelatorDaoImpl.findRoute");
         String hdr = "findRoute(key=" + key + "): ";
         if (__log.isDebugEnabled())
@@ -117,19 +120,30 @@
         // is a much safer alternative.
         String processType = new QName(_hobj.getProcess().getTypeNamespace(), 
_hobj.getProcess().getTypeName()).toString();
         Query lockQry = getSession().createQuery(LOCK_SELECTORS);
-        lockQry.setString(0, key == null ? null : key.toCanonicalString());
+        lockQry.setString(0, key == null ? "%" : key.toCanonicalString());
         lockQry.setString(1, processType);
         if (lockQry.executeUpdate() > 0) {
             
             Query q = getSession().createQuery(FLTR_SELECTORS);
-            q.setString(0, key == null ? null : key.toCanonicalString());
+            q.setString(0, key == null ? "%" : key.toCanonicalString());
             q.setString(1, processType);
             q.setString(2, _hobj.getCorrelatorId());
             q.setLockMode("hs", LockMode.UPGRADE);
 
             HCorrelatorSelector selector;
             try {
-                selector = (HCorrelatorSelector) q.uniqueResult();
+               List<HProcessInstance> targets = new 
ArrayList<HProcessInstance>();
+               Iterator selectors = q.iterate();
+               while (selectors.hasNext()) {
+                    selector = (HCorrelatorSelector) selectors.next();
+                    if (selector != null) {
+                       if ("all".equals(selector.getRoute()) || 
+                                       ("one".equals(selector.getRoute()) && 
!targets.contains(selector.getInstance()))) {
+                               routes.add(new MessageRouteDaoImpl(_sm, 
selector));
+                               targets.add(selector.getInstance());
+                       }
+                    }
+               }
             } catch (Exception ex) {
                 __log.debug("Strange, could not get a unique result for 
findRoute, trying to iterate instead.");
 
@@ -139,8 +153,8 @@
                 Hibernate.close(i);
             }
     
-            __log.debug(hdr + "found " + selector);
-            return selector == null ? null : new MessageRouteDaoImpl(_sm, 
selector);
+            __log.debug(hdr + "found " + routes);
+            return routes;
         } 
         
         return null;
@@ -177,7 +191,7 @@
         return ret;
     }
 
-    public void addRoute(String routeGroupId, ProcessInstanceDAO target, int 
idx, CorrelationKey correlationKey) {
+    public void addRoute(String routeGroupId, ProcessInstanceDAO target, int 
idx, CorrelationKey correlationKey, String routePolicy) {
         entering("CorrelatorDaoImpl.addRoute");
         String hdr = "addRoute(" + routeGroupId + ", iid=" + 
target.getInstanceId() + ", idx=" + idx + ", ckey="
                 + correlationKey + "): ";
@@ -192,6 +206,7 @@
         hsel.setProcessType(target.getProcess().getType().toString());
         hsel.setCorrelator(_hobj);
         hsel.setCreated(new Date());
+        hsel.setRoutePolicy(routePolicy);
 //        _hobj.addSelector(hsel);
         getSession().save(hsel);
 

Modified: 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java?rev=709298&r1=709297&r2=709298&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
 Thu Oct 30 16:58:51 2008
@@ -337,6 +337,19 @@
         _hself.setPipedMessageExchangeId(mexId);
     }
 
+
+       public int getSubscriberCount() {
+               return _hself.getSubscriberCount();
+       }
+       
+       public void setSubscriberCount(int subscriberCount) {
+               _hself.setSubscriberCount(subscriberCount);             
+       }
+
+       public void incrementSubscriberCount() {
+               _hself.incrementSubscriberCount();
+       }
+       
     public void release() {
         // no-op for now, could be used to do some cleanup
     }

Modified: 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageRouteDaoImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageRouteDaoImpl.java?rev=709298&r1=709297&r2=709298&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageRouteDaoImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageRouteDaoImpl.java
 Thu Oct 30 16:58:51 2008
@@ -69,5 +69,9 @@
         entering("MessageRouteDaoImpl.getIndex");
         return _selector.getIndex();
     }
+    
+    public String getRoute() {
+       return _selector.getRoute();
+    }
 
 }

Modified: 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java?rev=709298&r1=709297&r2=709298&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java
 Thu Oct 30 16:58:51 2008
@@ -31,6 +31,7 @@
     private HCorrelator _correlator;
     private String _correlationKey;
     private String _processType;
+    private String _routePolicy;
     
     /**
      * @hibernate.many-to-one column="PIID" not-null="true"
@@ -101,6 +102,17 @@
     }
 
     /**
+     * @hibernate.property column="ROUTE_POLICY" not-null="true"
+     */
+    public String getRoute() {
+        return _routePolicy;
+    }
+
+    public void setRoutePolicy(String _routePolicy) {
+        this._routePolicy = _routePolicy;
+    }
+
+    /**
      * @hibernate.many-to-one not-null="true"
      * @hibernate.column name="CORRELATOR" not-null="true" 
      *          index="IDX_SELECTOR_CORRELATOR" unique-key="UNIQ_SELECTOR"

Modified: 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java?rev=709298&r1=709297&r2=709298&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
 Thu Oct 30 16:58:51 2008
@@ -71,6 +71,8 @@
     private String _callee;
 
     private String _pipedMessageExchangeId;
+    
+    private int _subscriberCount;
 
     private Map<String, String> _properties = new HashMap<String, String>();
 
@@ -334,4 +336,16 @@
     public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
         _pipedMessageExchangeId = pipedMessageExchangeId;
     }
+    
+    public int getSubscriberCount() {
+       return _subscriberCount;
+    }
+    
+    public void setSubscriberCount(int subscriberCount) {
+       this._subscriberCount = subscriberCount;
+    }
+    
+    public void incrementSubscriberCount() {
+       _subscriberCount++;
+    }
 }

Modified: 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java?rev=709298&r1=709297&r2=709298&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
 Thu Oct 30 16:58:51 2008
@@ -36,7 +36,7 @@
 @NamedQueries({
     @NamedQuery(name="RouteByCKey", query="SELECT route " +
             "FROM MessageRouteDAOImpl as route " +
-            "WHERE route._correlationKey = :ckey " +
+            "WHERE route._correlationKey like :ckey " +
                    "and route._correlator._process._processType = :ptype " +
                    "and route._correlator._correlatorKey = :corrkey")
         })
@@ -60,9 +60,9 @@
         _process = process;
     }
 
-    public void addRoute(String routeGroupId, ProcessInstanceDAO target, int 
index, CorrelationKey correlationKey) {
+    public void addRoute(String routeGroupId, ProcessInstanceDAO target, int 
index, CorrelationKey correlationKey, String routePolicy) {
         MessageRouteDAOImpl mr = new MessageRouteDAOImpl(correlationKey,
-                routeGroupId, index, (ProcessInstanceDAOImpl) target, this);
+                routeGroupId, index, (ProcessInstanceDAOImpl) target, this, 
routePolicy);
         _routes.add(mr);
     }
 
@@ -88,15 +88,27 @@
 
     }
 
-    public MessageRouteDAO findRoute(CorrelationKey correlationKey) {
+    public List<MessageRouteDAO> findRoute(CorrelationKey correlationKey) {
         Query qry = getEM().createNamedQuery("RouteByCKey");
-        qry.setParameter("ckey", correlationKey.toCanonicalString());
+        qry.setParameter("ckey", correlationKey == null ? "%" : 
correlationKey.toCanonicalString());
         qry.setParameter("ptype", _process.getType().toString());
         qry.setParameter("corrkey", _correlatorKey);
         List<MessageRouteDAO> routes = (List<MessageRouteDAO>) 
qry.getResultList();
         if (routes.size() > 0) {
-          return routes.get(0);
-        } else return null;
+               List<ProcessInstanceDAO> targets = new 
ArrayList<ProcessInstanceDAO>();
+            for (int i = 0; i < routes.size(); i++) {
+               MessageRouteDAO route = routes.get(i);
+               if ("all".equals(route.getRoute()) || 
+                               ("one".equals(route.getRoute()) && 
!targets.contains(route.getTargetInstance()))) {
+                       targets.add(route.getTargetInstance());
+               } else {
+                       routes.remove(i);
+               }
+            }
+            return routes;
+        } else {
+               return null;
+        }
     }
 
     public String getCorrelatorId() {

Modified: 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?rev=709298&r1=709297&r2=709298&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
 Thu Oct 30 16:58:51 2008
@@ -91,6 +91,8 @@
     private String _correlationKeys;
     @Basic @Column(name="PIPED_ID")
     private String _pipedMessageExchangeId;
+    @Basic @Column(name="SUBSCRIBER_COUNT")
+    private int _subscriberCount;
 
     
@OneToMany(targetEntity=MexProperty.class,mappedBy="_mex",fetch=FetchType.EAGER,cascade={CascadeType.ALL})
     private Collection<MexProperty> _props = new ArrayList<MexProperty>();
@@ -329,11 +331,6 @@
         return correlationKeys;
     }
 
-
-    public void release() {
-        // no-op for now, could be used to do some cleanup
-    }
-
     public CorrelatorDAOImpl getCorrelator() {
         return _correlator;
     }
@@ -341,4 +338,21 @@
     public void setCorrelator(CorrelatorDAOImpl correlator) {
         _correlator = correlator;
     }
+
+       public int getSubscriberCount() {
+               return _subscriberCount;
+       }
+       
+       public void setSubscriberCount(int subscriberCount) {
+               this._subscriberCount = subscriberCount;
+       }
+
+       public void incrementSubscriberCount() {
+               ++_subscriberCount;
+       }
+       
+    public void release() {
+        // no-op for now, could be used to do some cleanup
+    }
+
 }

Modified: 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageRouteDAOImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageRouteDAOImpl.java?rev=709298&r1=709297&r2=709298&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageRouteDAOImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageRouteDAOImpl.java
 Thu Oct 30 16:58:51 2008
@@ -47,6 +47,8 @@
     private int _index;
        @Basic @Column(name="CORRELATION_KEY")
     private String _correlationKey;
+       @Basic @Column(name="ROUTE_POLICY")
+    private String _routePolicy;       
 
     @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) 
@Column(name="PROCESS_INSTANCE_ID")
     private ProcessInstanceDAOImpl _processInst;
@@ -55,12 +57,13 @@
 
     public MessageRouteDAOImpl() {}
        public MessageRouteDAOImpl(CorrelationKey key, String groupId, int 
index,
-                               ProcessInstanceDAOImpl processInst, 
CorrelatorDAOImpl correlator) {
+                               ProcessInstanceDAOImpl processInst, 
CorrelatorDAOImpl correlator, String routePolicy) {
                _correlationKey = key.toCanonicalString();
                _groupId = groupId;
                _index = index;
                _processInst = processInst;
         _correlator = correlator;
+        _routePolicy = routePolicy;
     }
 
     public Long getId() {
@@ -82,5 +85,9 @@
        public ProcessInstanceDAO getTargetInstance() {
                return _processInst;
        }
+       
+       public String getRoute() {
+               return _routePolicy;
+       }
 
 }


Reply via email to