Author: asankaa Date: Mon Nov 24 11:54:27 2008 New Revision: 24751 URL: http://wso2.org/svn/browse/wso2?view=rev&revision=24751
Log: Subscribe Renew, GetStatus complet cycle with the samples to test Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSubscriber.java branches/synapse/1.2.wso2v1/modules/samples/src/main/scripts/build.xml Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java?rev=24751&r1=24750&r2=24751&view=diff ============================================================================== --- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java (original) +++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java Mon Nov 24 11:54:27 2008 @@ -145,8 +145,49 @@ } } else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) { // Get responce status + Subscription subscription = SubscriptionMessageBuilder.createGetStatusMessage(smc); + subscription = subscriptionManager.getSubscription(subscription.getId()); + if(subscription !=null){ + //send the responce + SOAPEnvelope soapEnvelope = messageBuilder.genGetStatusResponse(subscription); + String replyAddress = mc.getOptions().getReplyTo().getAddress(); + AddressEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition def = new EndpointDefinition(); + def.setAddress(replyAddress.trim()); + endpoint.setDefinition(def); + org.apache.synapse.MessageContext rmc = new Axis2MessageContext(mc, synCfg, synEnv); + rmc.setEnvelope(soapEnvelope); + rmc.setTo(new EndpointReference(replyAddress)); + rmc.setWSAAction(EventingConstants.WSE_GET_STATUS_RESPONSE); + rmc.setSoapAction(EventingConstants.WSE_GET_STATUS_RESPONSE); + RelatesTo relatesTo = new RelatesTo(subscription.getId()); + rmc.setRelatesTo(new RelatesTo[] { relatesTo }); + endpoint.send(rmc); + } else { + //TODO: send the fault message + } } else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) { // Renew subscription + Subscription subscription = SubscriptionMessageBuilder.createRenewSubscribeMessage(smc); + if (subscriptionManager.renewSubscription(subscription)){ + //send the response + SOAPEnvelope soapEnvelope = messageBuilder.genRenewSubscriptionResponse(subscription); + String replyAddress = mc.getOptions().getReplyTo().getAddress(); + AddressEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition def = new EndpointDefinition(); + def.setAddress(replyAddress.trim()); + endpoint.setDefinition(def); + org.apache.synapse.MessageContext rmc = new Axis2MessageContext(mc, synCfg, synEnv); + rmc.setEnvelope(soapEnvelope); + rmc.setTo(new EndpointReference(replyAddress)); + rmc.setWSAAction(EventingConstants.WSE_RENEW_RESPONSE); + rmc.setSoapAction(EventingConstants.WSE_RENEW_RESPONSE); + RelatesTo relatesTo = new RelatesTo(subscription.getId()); + rmc.setRelatesTo(new RelatesTo[] { relatesTo }); + endpoint.send(rmc); + }else{ + //TODO: send the fault message + } } else { // Treat as an Event List<Subscription> subscribers = subscriptionManager.getMatchingSubscribers(smc); Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java?rev=24751&r1=24750&r2=24751&view=diff ============================================================================== --- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java (original) +++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java Mon Nov 24 11:54:27 2008 @@ -40,6 +40,8 @@ public abstract boolean deleteSubscription(String id); + public abstract boolean renewSubscription(Subscription subscription); + public abstract void init(); public void addProperty(String name, String value) { Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java?rev=24751&r1=24750&r2=24751&view=diff ============================================================================== --- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java (original) +++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java Mon Nov 24 11:54:27 2008 @@ -55,6 +55,7 @@ private static final QName ATT_XPATH = new QName(XMLConfigConstants.NULL_NAMESPACE, EventingConstants.WSE_EN_XPATH); private static final QName IDENTIFIER = new QName(EventingConstants.WSE_EVENTING_NS, EventingConstants.WSE_EN_IDENTIFIER); private static final QName EXPIRES = new QName(EventingConstants.WSE_EVENTING_NS, EventingConstants.WSE_EN_EXPIRES); + private static final QName RENEW = new QName(EventingConstants.WSE_EVENTING_NS, EventingConstants.WSE_EN_RENEW); /** * (01) <s12:Envelope @@ -258,10 +259,13 @@ OMElement elem = mc.getEnvelope().getHeader().getFirstChildWithName(IDENTIFIER); String id = (String) elem.getText(); subscription.setId(id); - OMElement expiryElem = elem.getFirstChildWithName(EXPIRES); - if (expiryElem != null) { - Calendar calendar = ConverterUtil.convertToDateTime(expiryElem.getText()); - subscription.setExpires(calendar); + OMElement renewElem = mc.getEnvelope().getBody().getFirstChildWithName(RENEW); + if (renewElem != null) { + OMElement expiryElem = renewElem.getFirstChildWithName(EXPIRES); + if (expiryElem != null) { + Calendar calendar = ConverterUtil.convertToDateTime(expiryElem.getText()); + subscription.setExpires(calendar); + } } return subscription; } @@ -292,7 +296,8 @@ * (23) <s12:Body> * (24) <wse:GetStatus /> * (25) </s12:Body> - * (26) </s12:Envelope> + * (26) </s12:Envelope> + * * @param mc * @return */ Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java?rev=24751&r1=24750&r2=24751&view=diff ============================================================================== --- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java (original) +++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java Mon Nov 24 11:54:27 2008 @@ -63,6 +63,20 @@ } } + /** + * Renew the subscription by setting the expire date time + * @param subscription + * @return + */ + public boolean renewSubscription(Subscription subscription){ + Subscription subscriptionOld = getSubscription(subscription.getId()); + if (subscriptionOld !=null){ + subscriptionOld.setExpires(subscription.getExpires()); + return true; + }else{ + return false; + } + } public List<Subscription> getSubscribers() { LinkedList<Subscription> list = new LinkedList<Subscription>(); for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) { @@ -72,7 +86,8 @@ } public List<Subscription> getMatchingSubscribers(MessageContext mc) { - LinkedList<Subscription> list = new LinkedList<Subscription>(); + //TODO : check the expiration + LinkedList<Subscription> list = new LinkedList<Subscription>(); for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) { XPathBasedEventFilter filter =(XPathBasedEventFilter) stringSubscriptionEntry.getValue().getFilter(); filter.setSourceXpath(topicXPath); Modified: branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSubscriber.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSubscriber.java?rev=24751&r1=24750&r2=24751&view=diff ============================================================================== --- branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSubscriber.java (original) +++ branches/synapse/1.2.wso2v1/modules/samples/src/main/java/samples/userguide/EventSubscriber.java Mon Nov 24 11:54:27 2008 @@ -63,6 +63,7 @@ String address = getProperty("address", "http://localhost:9000/soap/SimpleStockQuoteService"); String mode = getProperty("mode", "subscribe"); String identifier = getProperty("identifier", "90000"); + String expires = getProperty("expires", "2008-12-31T21:07:00.000-08:00"); if (repo != null && !"null".equals(repo)) { configContext = @@ -89,7 +90,7 @@ OMElement addressOm = factory.createOMElement("Address", nswsa); factory.createOMText(addressOm, address); OMElement expiresOm = factory.createOMElement("Expires", nswse); - factory.createOMText(expiresOm, "2008-12-26T21:07:00.000-08:00"); + factory.createOMText(expiresOm, expires); OMElement filterOm = factory.createOMElement("Filter", nswse); filterOm.addAttribute(factory.createOMAttribute("Dialect", null, "http://synapse.apache.org/eventing/dialect/topicFilter")); factory.createOMText(filterOm, topic); @@ -108,7 +109,7 @@ options.setAction("http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe"); serviceClient.setOptions(options); - System.out.println("Subscribing \n" + subscribeOm.toString()); + System.out.println("Subscribing \n" + subscribeOm.toString()); OMElement response = serviceClient.sendReceive(subscribeOm); System.out.println("Subscribed to topic " + topic); Thread.sleep(1000); @@ -142,10 +143,8 @@ (25) </s12:Body> (26) </s12:Envelope>*/ OMElement subscribeOm = factory.createOMElement("Unsubscribe", nswse); - serviceClient.engageModule("addressing"); options.setTo(new EndpointReference(addUrl)); - options.setAction("http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe"); OMElement identifierOm = factory.createOMElement("Identifier", nswse); factory.createOMText(identifierOm, identifier); @@ -154,9 +153,99 @@ System.out.println("UnSubscribing \n" + subscribeOm.toString()); OMElement response = serviceClient.sendReceive(subscribeOm); System.out.println("UnSubscribed to ID " + identifier); - Thread.sleep(5000); - System.out.println("Un Subscribe Response Received: " + response.toString()); + Thread.sleep(1000); + System.out.println("UnSubscribe Response Received: " + response.toString()); + + } else if (mode.equals("renew")) { + /** + * (01) <s12:Envelope + (02) xmlns:s12="http://www.w3.org/2003/05/soap-envelope" + (03) xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" + (04) xmlns:wse="http://schemas.xmlsoap.org/ws/2004/08/eventing" + (05) xmlns:ow="http://www.example.org/oceanwatch" > + (06) <s12:Header> + (07) <wsa:Action> + (08) http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew + (09) </wsa:Action> + (10) <wsa:MessageID> + (11) uuid:bd88b3df-5db4-4392-9621-aee9160721f6 + (12) </wsa:MessageID> + (13) <wsa:ReplyTo> + (14) <wsa:Address>http://www.example.com/MyEventSink</wsa:Address> + (15) </wsa:ReplyTo> + (16) <wsa:To> + (17) http://www.example.org/oceanwatch/SubscriptionManager + (18) </wsa:To> + (19) <wse:Identifier> + (20) uuid:22e8a584-0d18-4228-b2a8-3716fa2097fa + (21) </wse:Identifier> + (22) </s12:Header> + (23) <s12:Body> + (24) <wse:Renew> + (25) <wse:Expires>2004-06-26T21:07:00.000-08:00</wse:Expires> + (26) </wse:Renew> + (27) </s12:Body> + (28) </s12:Envelope> + */ + OMElement subscribeOm = factory.createOMElement("Renew", nswse); + OMElement expiresOm = factory.createOMElement("Expires", nswse); + factory.createOMText(expiresOm, expires); + subscribeOm.addChild(expiresOm); + serviceClient.engageModule("addressing"); + options.setTo(new EndpointReference(addUrl)); + options.setAction("http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew"); + OMElement identifierOm = factory.createOMElement("Identifier", nswse); + factory.createOMText(identifierOm, identifier); + serviceClient.addHeader(identifierOm); + serviceClient.setOptions(options); + System.out.println("Subscription Renew \n" + subscribeOm.toString()); + OMElement response = serviceClient.sendReceive(subscribeOm); + System.out.println("Subscription Renew to ID " + identifier); + Thread.sleep(1000); + System.out.println("Subscription Renew Response Received: " + response.toString()); + } else if (mode.equals("getstatus")) { + /** + * (01) <s12:Envelope + (02) xmlns:s12="http://www.w3.org/2003/05/soap-envelope" + (03) xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" + (04) xmlns:wse="http://schemas.xmlsoap.org/ws/2004/08/eventing" + (05) xmlns:ow="http://www.example.org/oceanwatch" > + (06) <s12:Header> + (07) <wsa:Action> + (08) http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus + (09) </wsa:Action> + (10) <wsa:MessageID> + (11) uuid:bd88b3df-5db4-4392-9621-aee9160721f6 + (12) </wsa:MessageID> + (13) <wsa:ReplyTo> + (14) <wsa:Address>http://www.example.com/MyEventSink</wsa:Address> + (15) </wsa:ReplyTo> + (16) <wsa:To> + (17) http://www.example.org/oceanwatch/SubscriptionManager + (18) </wsa:To> + (19) <wse:Identifier> + (20) uuid:22e8a584-0d18-4228-b2a8-3716fa2097fa + (21) </wse:Identifier> + (22) </s12:Header> + (23) <s12:Body> + (24) <wse:GetStatus /> + (25) </s12:Body> + (26) </s12:Envelope> + */ + OMElement subscribeOm = factory.createOMElement("GetStatus", nswse); + serviceClient.engageModule("addressing"); + options.setTo(new EndpointReference(addUrl)); + options.setAction("http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus"); + OMElement identifierOm = factory.createOMElement("Identifier", nswse); + factory.createOMText(identifierOm, identifier); + serviceClient.addHeader(identifierOm); + serviceClient.setOptions(options); + System.out.println("GetStatus using \n" + subscribeOm.toString()); + OMElement response = serviceClient.sendReceive(subscribeOm); + System.out.println("GetStatus to ID " + identifier); + Thread.sleep(1000); + System.out.println("GetStatus Response Received: " + response.toString()); } try { Modified: branches/synapse/1.2.wso2v1/modules/samples/src/main/scripts/build.xml URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/samples/src/main/scripts/build.xml?rev=24751&r1=24750&r2=24751&view=diff ============================================================================== --- branches/synapse/1.2.wso2v1/modules/samples/src/main/scripts/build.xml (original) +++ branches/synapse/1.2.wso2v1/modules/samples/src/main/scripts/build.xml Mon Nov 24 11:54:27 2008 @@ -97,6 +97,7 @@ <property name="topicns" value=""/> <property name="mode" value=""/> <property name="identifier" value=""/> + <property name="expires" value=""/> <target name="clean"> <delete dir="target" quiet="true"/> @@ -177,6 +178,7 @@ <sysproperty key="repository" value="${repository}"/> <sysproperty key="mode" value="${mode}"/> <sysproperty key="identifier" value="${identifier}"/> + <sysproperty key="expires" value="${expires}"/> <sysproperty key="java.io.tmpdir" value="./../../work/temp/sampleClient"/> </java> </target> _______________________________________________ Esb-java-dev mailing list [email protected] https://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev
