Author: mriou
Date: Tue Aug 26 08:45:27 2008
New Revision: 689116
URL: http://svn.apache.org/viewvc?rev=689116&view=rev
Log:
Finishing ODE-364
Added:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BrokeredMyRoleMessageExchangeImpl.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/SharedEndpoints.java
Added:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BrokeredMyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BrokeredMyRoleMessageExchangeImpl.java?rev=689116&view=auto
==============================================================================
---
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BrokeredMyRoleMessageExchangeImpl.java
(added)
+++
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BrokeredMyRoleMessageExchangeImpl.java
Tue Aug 26 08:45:27 2008
@@ -0,0 +1,78 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+
+
+/**
+ * A reliable MEP that delegates messages to a list of subscribers
+ *
+ * @author $author$
+ * @version $Revision$
+ */
+public class BrokeredMyRoleMessageExchangeImpl
+ extends MyRoleMessageExchangeImpl {
+ private List<MyRoleMessageExchange> subscribers;
+ private MyRoleMessageExchange template;
+
+ /**
+ * Creates a new BrokeredReliableMyRoleMessageExchangeImpl object.
+ *
+ * @param process
+ * @param subscribers
+ * @param mexId
+ * @param oplink
+ * @param template
+ */
+ public BrokeredMyRoleMessageExchangeImpl(BpelProcess process,
BpelEngineImpl engine,
+ List<MyRoleMessageExchange> subscribers, MessageExchangeDAO mexDao,
MyRoleMessageExchange template) {
+ super(process, engine, mexDao);
+ this.subscribers = subscribers;
+ this.template = template;
+ }
+
+ /**
+ * Propagate the invoke reliable call to each subscriber
+ */
+ public Future invoke(Message request) {
+ Future myFuture = null;
+ for (MyRoleMessageExchange subscriber : subscribers) {
+ Future theirFuture = subscriber.invoke(request);
+ if (myFuture == null) {
+ myFuture = theirFuture;
+ }
+ }
+ return myFuture;
+ }
+
+ /**
+ * Use the EPR of one of the subscribers as my EPR
+ *
+ * @return type
+ *
+ * @throws BpelEngineException BpelEngineException
+ */
+ @Override
+ public EndpointReference getEndpointReference() throws BpelEngineException
{
+ return template.getEndpointReference();
+ }
+
+ /**
+ * Use the response from one of the subscribers as my response
+ *
+ * @return type
+ */
+ @Override
+ public Message getResponse() {
+ return template.getResponse();
+ }
+}
Added:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/SharedEndpoints.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/SharedEndpoints.java?rev=689116&view=auto
==============================================================================
---
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/SharedEndpoints.java
(added)
+++
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/SharedEndpoints.java
Tue Aug 26 08:45:27 2008
@@ -0,0 +1,93 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.EndpointReference;
+
+
+/**
+ * An in-memory map from the endpoints provided by various processes in
+ * the server to their corresponding endpoint references.
+ *
+ * @author $author$
+ * @version $Revision$
+ */
+public class SharedEndpoints {
+ // Map of every endpoint provided by the server
+ private static Map<Endpoint, EndpointReference> _eprs = new
HashMap<Endpoint, EndpointReference>();
+ private static List<Endpoint> _referenceCounts = new ArrayList<Endpoint>();
+
+ /**
+ * Creates a new SharedEndpoints object.
+ */
+ public SharedEndpoints() {
+ init();
+ }
+
+ /**
+ * This is called when the server is initializing
+ */
+ public void init() {
+ _eprs.clear();
+ _referenceCounts.clear();
+ }
+
+ /**
+ * Add an endpoint along with its corresponding EPR
+ *
+ * @param endpoint endpoint
+ * @param epr epr
+ */
+ public void addEndpoint(Endpoint endpoint, EndpointReference epr) {
+ _eprs.put(endpoint, epr);
+ }
+
+ /**
+ * Remove an endpoint along with its EPR
+ * This is called when there are no more references
+ * to this endpoint from any BPEL process
+ * (which provides a service at this endpoint)
+ *
+ * @param endpoint endpoint
+ */
+ public void removeEndpoint(Endpoint endpoint) {
+ _eprs.remove(endpoint);
+ }
+
+ /**
+ * Get the EPR for an endpoint
+ *
+ * @param endpoint endpoint
+ *
+ * @return type
+ */
+ public EndpointReference getEndpointReference(Endpoint endpoint) {
+ return _eprs.get(endpoint);
+ }
+
+ /**
+ * Increment the number of BPEL processes who provide
+ * a service specifically at this endpoint.
+ *
+ * @param endpoint endpoint
+ */
+ public void incrementReferenceCount(Endpoint endpoint) {
+ _referenceCounts.add(endpoint);
+ }
+
+ /**
+ * Decrement the number of BPEL processes who provide
+ * a service specifically at this endpoint.
+ *
+ * @param endpoint endpoint
+ *
+ * @return type
+ */
+ public boolean decrementReferenceCount(Endpoint endpoint) {
+ return _referenceCounts.remove(endpoint);
+ }
+}