Author: asankaa Date: Sun Nov 23 08:57:27 2008 New Revision: 24619 URL: http://wso2.org/svn/browse/wso2?view=rev&revision=24619
Log: remove message reciver and made eventsource as the core message reciver Removed: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java?rev=24619&r1=24618&r2=24619&view=diff ============================================================================== --- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java (original) +++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java Sun Nov 23 08:57:27 2008 @@ -20,21 +20,35 @@ package org.apache.synapse.eventing; import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.MessageContext; import org.apache.axis2.description.AxisOperation; import org.apache.axis2.description.AxisService; import org.apache.axis2.description.InOutAxisOperation; import org.apache.axis2.engine.AxisConfiguration; import org.apache.synapse.SynapseConstants; +import org.apache.synapse.util.MessageHelper; +import org.apache.synapse.endpoints.AddressEndpoint; +import org.apache.synapse.endpoints.EndpointDefinition; +import org.apache.synapse.eventing.builders.SubscriptionMessageBuilder; +import org.apache.synapse.eventing.builders.ResponseMessageBuilder; +import org.apache.synapse.config.SynapseConfiguration; +import org.apache.synapse.core.axis2.SynapseMessageReceiver; +import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.axiom.soap.SOAPEnvelope; import org.wso2.eventing.EventingConstants; import javax.xml.namespace.QName; +import java.util.List; /** * */ -public class EventSource { +public class EventSource extends SynapseMessageReceiver { private String name; private SubscriptionManager subscriptionManager; + private EventSource eventSource; public EventSource(String name) { this.name = name; @@ -64,13 +78,64 @@ // Set the names of the two messages so that Axis2 is able to produce a WSDL (see SYNAPSE-366): // mediateOperation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE).setName("in"); // mediateOperation.getMessage(WSDLConstants.MESSAGE_LABEL_OUT_VALUE).setName("out"); - EventingMessageReceiver receiver = new EventingMessageReceiver(this); - mediateOperation.setMessageReceiver(receiver); - subscribeOperation.setMessageReceiver(receiver); + this.setEventSource(this); + mediateOperation.setMessageReceiver(this); + subscribeOperation.setMessageReceiver(this); subscribeOperation.setSoapAction(EventingConstants.WSE_SUBSCRIBE); eventSourceService.addOperation(mediateOperation); eventSourceService.addOperation(subscribeOperation); axisCfg.addService(eventSourceService); } + + /** + * + * @return + */ + public EventSource getEventSource() { + return eventSource; + } + + /** + * + * @param eventSource + */ + public void setEventSource(EventSource eventSource) { + this.eventSource = eventSource; + } + public void receive(MessageContext mc) throws AxisFault { + SynapseConfiguration synCfg = (SynapseConfiguration) mc.getConfigurationContext() + .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_CONFIG).getValue(); + SynapseEnvironment synEnv = (SynapseEnvironment) mc.getConfigurationContext() + .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_ENV).getValue(); + org.apache.synapse.MessageContext smc = new Axis2MessageContext(mc, synCfg, synEnv); + if (EventingConstants.WSE_SUBSCRIBE.equals(mc.getWSAAction())) { + Subscription subscription = SubscriptionMessageBuilder.createSubscription(smc); + eventSource.getSubscriptionManager().addSubscription(subscription); + ResponseMessageBuilder messageBuilder = new ResponseMessageBuilder(mc); + SOAPEnvelope soapEnvelope = messageBuilder.genSubscriptionResponse(subscription); + String replyAddress = mc.getOptions().getReplyTo().getAddress(); + AddressEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition def = new EndpointDefinition(); + def.setAddress(replyAddress.trim()); + endpoint.setDefinition(def); + org.apache.synapse.MessageContext rmc = new Axis2MessageContext(mc, synCfg, synEnv); + rmc.setTo(new EndpointReference(replyAddress)); + rmc.setEnvelope(soapEnvelope); + endpoint.send(MessageHelper.cloneMessageContext(rmc)); + }else if (EventingConstants.WSE_UNSUBSCRIBE.equals(mc.getWSAAction())) { + // Unsubscribe for responce + }else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) { + // Get responce status + }else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) { + // Renew subscription + } else { + // Treat as an Event + List<Subscription> subscribers = eventSource.getSubscriptionManager().getMatchingSubscribers(smc); + for (Subscription subscription : subscribers) { + smc.setProperty("OUT_ONLY","true"); // Set one way message for events + subscription.getEndpoint().send(MessageHelper.cloneMessageContext(smc)); + } + } + } } Deleted: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java URL: http://wso2.org/svn/browse/wso2/None?pathrev=24618 _______________________________________________ Esb-java-dev mailing list [email protected] https://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev
