Author: asankaa
Date: Mon Jan 26 23:56:08 2009
New Revision: 29558
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=29558

Log:
fixing the 202 response issue with event dispatching.


Modified:
   
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java

Modified: 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java?rev=29558&r1=29557&r2=29558&view=diff
==============================================================================
--- 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
    (original)
+++ 
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/mediators/eventing/EventPublisherMediator.java
    Mon Jan 26 23:56:08 2009
@@ -19,12 +19,9 @@
 */
 
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.MessageContext;
-import org.apache.synapse.endpoints.AddressEndpoint;
-import org.apache.synapse.endpoints.EndpointDefinition;
 import org.apache.synapse.eventing.SynapseEventSource;
 import org.apache.synapse.eventing.SynapseSubscription;
 import org.apache.synapse.eventing.SynapseSubscriptionManager;
@@ -41,24 +38,13 @@
         if (log.isDebugEnabled()) {
             log.debug("Mediation for Event Publisher started");
         }
-        //sendResponce(synCtx); TODO need to investigate this further 
         SynapseEventSource eventSource = 
synCtx.getConfiguration().getEventSource(eventSourceName);
         SynapseSubscriptionManager subscriptionManager = 
eventSource.getSubscriptionManager();
         List<SynapseSubscription> subscribers = 
subscriptionManager.getMatchingSubscribers(synCtx);
-        for (SynapseSubscription subscription : subscribers) {
-            //TODO: send a 202 responce to the client, client wait and time 
outs
-            synCtx.setProperty("OUT_ONLY", "true");    // Set one way message 
for events
-            try {
-                
subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
-            } catch (AxisFault axisFault) {
-                log.error("Event sending failure " + axisFault.toString());
-                return false;
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Event push to  : " + subscription.getEndpointUrl());
-            }
-        }
-        return true;  //To change body of implemented methods use File | 
Settings | File Templates.
+        // Call event dispatcher
+        synCtx.getEnvironment().getExecutorService()
+                .execute(new EventDispatcher(synCtx, subscribers));
+        return true;
     }
 
     public String getEventSourceName() {
@@ -69,23 +55,30 @@
         this.eventSourceName = eventSourceName;
     }
 
-    private void sendResponce(MessageContext synCtx) {
-        MessageContext mc = null;
-        try {
-            mc = MessageHelper.cloneMessageContext(synCtx);
-            String replyAddress = mc.getReplyTo().getAddress();
-            AddressEndpoint endpoint = new AddressEndpoint();
-            EndpointDefinition def = new EndpointDefinition();
-            def.setAddress(replyAddress.trim());
-            def.setAddressingOn(true);
-            endpoint.setDefinition(def);
-            //mc.setEnvelope(null);
-            mc.setTo(new EndpointReference(replyAddress));
-            mc.setResponse(true);
-            endpoint.send(mc);
-        } catch (AxisFault axisFault) {
-            axisFault
-                    .printStackTrace();  //To change body of catch statement 
use File | Settings | File Templates.
+    /**
+     * Dispatching events async on a different thread
+     */
+    class EventDispatcher implements Runnable {
+        MessageContext synCtx;
+        List<SynapseSubscription> subscribers;
+
+        EventDispatcher(MessageContext synCtx, List<SynapseSubscription> 
subscribers) {
+            this.synCtx = synCtx;
+            this.subscribers = subscribers;
+        }
+
+        public void run() {
+            for (SynapseSubscription subscription : subscribers) {
+                synCtx.setProperty("OUT_ONLY", "true");    // Set one way 
message for events
+                try {
+                    
subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
+                } catch (AxisFault axisFault) {
+                    log.error("Event sending failure " + axisFault.toString());
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug("Event push to  : " + 
subscription.getEndpointUrl());
+                }
+            }
         }
     }
 }

_______________________________________________
Esb-java-dev mailing list
[email protected]
https://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev

Reply via email to