http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java deleted file mode 100644 index 072cbb3..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger; - -import java.io.StringReader; - -import javax.xml.stream.XMLStreamException; - -import org.apache.airavata.wsmg.broker.AdditionalMessageContent; -import org.apache.airavata.wsmg.broker.ConsumerInfo; -import org.apache.airavata.wsmg.commons.CommonRoutines; -import org.apache.airavata.wsmg.commons.NameSpaceConstants; -import org.apache.airavata.wsmg.config.WSMGParameter; -import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol; -import org.apache.airavata.wsmg.messenger.protocol.SendingException; -import org.apache.axiom.om.OMAbstractFactory; -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMFactory; -import org.apache.axis2.addressing.EndpointReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* - * this class is not thread safe - * */ -public class SenderUtils implements Deliverable { - - private static final Logger logger = LoggerFactory.getLogger(SenderUtils.class); - - private static OMFactory factory = OMAbstractFactory.getOMFactory(); - - private ConsumerUrlManager urlManager; - - private DeliveryProtocol protocol; - - public SenderUtils(ConsumerUrlManager urlMan) { - urlManager = urlMan; - } - - public void setProtocol(DeliveryProtocol protocol) { - this.protocol = protocol; - } - - public void send(ConsumerInfo consumerInfo, OMElement notificationMessageBodyEl, - AdditionalMessageContent additionalMessageContent) { - - if (consumerInfo.isPaused()) { - return; - } - - if (notificationMessageBodyEl == null) { - logger.info("notification message is null, IGNORED"); - return; - } - - if (urlManager.isUnavailable(consumerInfo.getConsumerEprStr())) { - logger.info("consumer url is unavailable: " + consumerInfo.getConsumerEprStr()); - return; - } - - EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr()); - - /* - * Extract message - */ - OMElement message = null; - if (consumerInfo.getType().compareTo("wsnt") == 0) { - if (consumerInfo.isUseNotify()) { - message = wrapRawMessageToWsntWrappedFormat(notificationMessageBodyEl, additionalMessageContent); - } else { - message = notificationMessageBodyEl; - } - } else { // wse - message = notificationMessageBodyEl; - } - - long timeElapsed = -1; - long startTime = -1; - - startTime = System.currentTimeMillis(); - - try { - - /* - * sending message out - */ - protocol.deliver(consumerInfo, message, additionalMessageContent); - - long finishTime = System.currentTimeMillis(); - timeElapsed = finishTime - startTime; - if (WSMGParameter.showTrackId) - logger.info(String.format("track id = %s : delivered to: %s in %d ms", - additionalMessageContent.getTrackId(), consumerReference.getAddress(), timeElapsed)); - - urlManager.onSucessfullDelivery(consumerReference, timeElapsed); - } catch (SendingException ex) { - - long finishTime = System.currentTimeMillis(); - timeElapsed = finishTime - startTime; - - urlManager.onFailedDelivery(consumerReference, finishTime, timeElapsed, ex, additionalMessageContent); - - } - } - - public OMElement wrapRawMessageToWsntWrappedFormat(OMElement rawNotif, - AdditionalMessageContent additionalMessageContent) { - - OMElement fullNotif = factory.createOMElement("Notify", NameSpaceConstants.WSNT_NS); - - OMElement notificationMessageEl = factory.createOMElement("NotificationMessage", NameSpaceConstants.WSNT_NS, - fullNotif); - - String topicElString = additionalMessageContent.getTopicElement(); - if (topicElString != null) { - OMElement topicEl = null; - try { - topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString)); - } catch (XMLStreamException e) { - logger.error("XMLStreamreader exception when setting topicEl", e); - } - notificationMessageEl.addChild(topicEl); - } - String producerReferenceElString = additionalMessageContent.getProducerReference(); - if (producerReferenceElString != null) { - OMElement producerReferenceEl = null; - try { - producerReferenceEl = CommonRoutines.reader2OMElement(new StringReader(producerReferenceElString)); - } catch (XMLStreamException e) { - logger.error("XMLStreamException at creating producerReferenceEl", e); - } - notificationMessageEl.addChild(producerReferenceEl); - } - - OMElement messageEl = factory.createOMElement("Message", NameSpaceConstants.WSNT_NS, notificationMessageEl); - messageEl.addChild(rawNotif); - - return fullNotif; - - } - -}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java deleted file mode 100644 index 417cef7..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.protocol; - -import org.apache.airavata.wsmg.broker.AdditionalMessageContent; -import org.apache.airavata.wsmg.broker.ConsumerInfo; -import org.apache.axiom.om.OMElement; - -public interface DeliveryProtocol { - - public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent) - throws SendingException; - - public void setTimeout(long timeout); -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java deleted file mode 100644 index c4dd24a..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.protocol; - -import org.apache.axis2.AxisFault; - -public class SendingException extends AxisFault { - - /** - * - */ - private static final long serialVersionUID = 6250791562500752579L; - - public SendingException(Throwable cause) { - super(cause); - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java deleted file mode 100644 index 7e2568a..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.protocol.impl; - -import java.io.StringReader; -import java.util.LinkedList; -import java.util.List; - -import javax.xml.stream.XMLStreamException; - -import org.apache.airavata.wsmg.broker.AdditionalMessageContent; -import org.apache.airavata.wsmg.broker.ConsumerInfo; -import org.apache.airavata.wsmg.commons.CommonRoutines; -import org.apache.airavata.wsmg.commons.NameSpaceConstants; -import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol; -import org.apache.airavata.wsmg.messenger.protocol.SendingException; -import org.apache.axiom.om.OMAbstractFactory; -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.util.ElementHelper; -import org.apache.axiom.soap.SOAPFactory; -import org.apache.axiom.soap.SOAPHeaderBlock; -import org.apache.axis2.AxisFault; -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.client.Options; -import org.apache.axis2.client.ServiceClient; -import org.apache.axis2.transport.http.HTTPConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Axis2Protocol implements DeliveryProtocol { - - private static final Logger logger = LoggerFactory.getLogger(Axis2Protocol.class); - - private static SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory(); - - private ServiceClient nonThreadLocalServiceClient; - - long tcpConnectionTimeout; - - public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent) - throws SendingException { - EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr()); - - /* - * Extract information - */ - String actionString = null; - List<OMElement> soapHeaders = new LinkedList<OMElement>(); - if (consumerInfo.getType().compareTo("wsnt") == 0) { - actionString = NameSpaceConstants.WSNT_NS.getNamespaceURI() + "/Notify"; - } else { // wse - actionString = additionalMessageContent.getAction(); - String topicElString = additionalMessageContent.getTopicElement(); - if (topicElString != null) { - OMElement topicEl = null; - try { - topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString)); - soapHeaders.add(topicEl); - } catch (XMLStreamException e) { - logger.error("exception at topicEl xmlStreamException", e); - } - } - } - - ServiceClient client = null; - try { - - client = configureServiceClient(actionString, consumerReference, additionalMessageContent.getMessageID(), - soapHeaders); - - client.sendRobust(message); - - } catch (AxisFault ex) { - throw new SendingException(ex.getCause()); - } finally { - if (client != null) { - try { - client.cleanup(); - client.cleanupTransport(); - } catch (AxisFault ex) { - logger.error(ex.getMessage(), ex); - } - } - } - } - - public void setTimeout(long timeout) { - this.tcpConnectionTimeout = timeout; - } - - private ServiceClient configureServiceClient(String action, EndpointReference consumerLocation, String msgId, - List<OMElement> soapHeaders) throws AxisFault { - - // not engaging addressing modules - ServiceClient client = getServiceClient(); - - SOAPHeaderBlock msgIdEl = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS); - msgIdEl.setText(msgId); - SOAPHeaderBlock actionEl = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS); - actionEl.setText(action); - - SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS); - to.setText(consumerLocation.getAddress()); - - client.addHeader(actionEl); - client.addHeader(msgIdEl); - client.addHeader(to); - - for (OMElement omHeader : soapHeaders) { - try { - SOAPHeaderBlock headerBlock = ElementHelper.toSOAPHeaderBlock(omHeader, soapfactory); - client.addHeader(headerBlock); - } catch (Exception e) { - throw AxisFault.makeFault(e); - } - } - - Options opts = new Options(); - opts.setTimeOutInMilliSeconds(tcpConnectionTimeout); - opts.setMessageId(msgId); - opts.setTo(consumerLocation); - opts.setAction(action); - opts.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE); - opts.setProperty(HTTPConstants.HTTP_PROTOCOL_VERSION, HTTPConstants.HEADER_PROTOCOL_10); - client.setOptions(opts); - - return client; - } - - private ServiceClient getServiceClient() throws AxisFault { - if (nonThreadLocalServiceClient == null) { - nonThreadLocalServiceClient = new ServiceClient(); - } - nonThreadLocalServiceClient.removeHeaders(); - return nonThreadLocalServiceClient; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java deleted file mode 100644 index 9eb50cc..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.strategy; - -import org.apache.airavata.wsmg.commons.OutGoingMessage; -import org.apache.airavata.wsmg.messenger.Deliverable; - -public interface SendingStrategy { - void init(); - - void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable); - - void shutdown(); -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java deleted file mode 100644 index 5236f47..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.strategy.impl; - -import java.io.StringReader; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.airavata.wsmg.commons.CommonRoutines; -import org.apache.airavata.wsmg.messenger.Deliverable; -import org.apache.axiom.om.OMElement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class ConsumerHandler implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class); - - protected LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>(); - - private String consumerUrl; - - private Deliverable deliverable; - - public ConsumerHandler(String url, Deliverable deliverable) { - this.consumerUrl = url; - this.deliverable = deliverable; - } - - public String getConsumerUrl() { - return consumerUrl; - } - - public void submitMessage(LightweightMsg msg) { - try { - queue.put(msg); - } catch (InterruptedException e) { - log.error("Interrupted when trying to add message"); - } - } - - protected void send(List<LightweightMsg> list) { - for (LightweightMsg m : list) { - try { - OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad())); - deliverable.send(m.getConsumerInfo(), messgae2Send, m.getHeader()); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java deleted file mode 100644 index 7d21fdb..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.strategy.impl; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.airavata.wsmg.broker.ConsumerInfo; -import org.apache.airavata.wsmg.commons.OutGoingMessage; -import org.apache.airavata.wsmg.messenger.Deliverable; -import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FixedParallelSender implements SendingStrategy { - - private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class); - - private static final long TIME_TO_WAIT_FOR_SHUTDOWN_SECOND = 30; - - private HashMap<String, ConsumerHandler> activeConsumerHandlers = new HashMap<String, ConsumerHandler>(); - private HashMap<String, Boolean> submittedConsumerHandlers = new HashMap<String, Boolean>(); - - private int batchSize; - - private ExecutorService threadPool; - - private boolean stop; - - private Thread t; - - public FixedParallelSender(int poolsize, int batchsize) { - this.threadPool = Executors.newFixedThreadPool(poolsize); - this.batchSize = batchsize; - } - - public void init() { - this.t = new Thread(new ChooseHandlerToSubmit()); - this.t.start(); - } - - public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) { - List<ConsumerInfo> consumerInfoList = outMessage.getConsumerInfoList(); - for (ConsumerInfo consumer : consumerInfoList) { - sendToConsumerHandler(consumer, outMessage, deliverable); - } - } - - public void shutdown() { - log.debug("Shutting down"); - this.stop = true; - - try { - this.t.join(); - } catch (InterruptedException ie) { - log.error("Wait for ChooseHandlerToSubmit thread to finish (join) is interrupted"); - } - - threadPool.shutdown(); - try { - threadPool.awaitTermination(TIME_TO_WAIT_FOR_SHUTDOWN_SECOND, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - log.error("Interrupted while waiting thread pool to shutdown"); - } - - log.debug("Shut down"); - } - - private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) { - - String consumerUrl = consumer.getConsumerEprStr(); - - LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(), - message.getAdditionalMessageContent()); - - synchronized (activeConsumerHandlers) { - ConsumerHandler handler = activeConsumerHandlers.get(consumerUrl); - if (handler == null) { - handler = new FixedParallelConsumerHandler(consumerUrl, deliverable); - activeConsumerHandlers.put(consumerUrl, handler); - submittedConsumerHandlers.put(consumerUrl, Boolean.FALSE); - } - handler.submitMessage(lwm); - } - } - - class ChooseHandlerToSubmit implements Runnable { - private static final int SLEEP_TIME_SECONDS = 1; - - public void run() { - /* - * If stop is true, we will not get any message to send from addMessageToSend() method. So, - * activeConsumerHandlers size will not increase but decrease only. When shutdown() is invoked, we will have - * to send out all messages in our queue. - */ - while (!stop || activeConsumerHandlers.size() > 0) { - - synchronized (activeConsumerHandlers) { - Iterator<String> it = activeConsumerHandlers.keySet().iterator(); - while (it.hasNext()) { - String key = it.next(); - boolean submitted = submittedConsumerHandlers.get(key); - - /* - * If consumer handlers is not scheduled to run, submit it to thread pool. - */ - if (!submitted) { - threadPool.submit(activeConsumerHandlers.get(key)); - submittedConsumerHandlers.put(key, Boolean.TRUE); - } - } - } - - try { - TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS); - } catch (InterruptedException e) { - log.error("interrupted while waiting", e); - } - } - } - } - - class FixedParallelConsumerHandler extends ConsumerHandler { - - public FixedParallelConsumerHandler(String url, Deliverable deliverable) { - super(url, deliverable); - } - - public void run() { - - log.debug(String.format("FixedParallelConsumerHandler starting: %s", getConsumerUrl())); - - ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>(); - - queue.drainTo(localList, batchSize); - - send(localList); - localList.clear(); - - /* - * Remove handler if and only if there is no message - */ - synchronized (activeConsumerHandlers) { - - /* - * all message is sent or not, we will set it as not submitted. So, it can be put back to thread pool. - */ - submittedConsumerHandlers.put(getConsumerUrl(), Boolean.FALSE); - - if (queue.size() == 0) { - submittedConsumerHandlers.remove(getConsumerUrl()); - activeConsumerHandlers.remove(getConsumerUrl()); - - log.debug(String.format("Consumer handler is already removed: %s", getConsumerUrl())); - } - } - - log.debug(String.format("FixedParallelConsumerHandler done: %s,", getConsumerUrl())); - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java deleted file mode 100644 index ca56c58..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.strategy.impl; - -import org.apache.airavata.wsmg.broker.AdditionalMessageContent; -import org.apache.airavata.wsmg.broker.ConsumerInfo; - -class LightweightMsg { - private ConsumerInfo consumerInfo; - private String payload; - private AdditionalMessageContent header; - - public LightweightMsg(ConsumerInfo c, String pld, AdditionalMessageContent h) { - consumerInfo = c; - payload = pld; - header = h; - } - - public String getPayLoad() { - return payload; - } - - public ConsumerInfo getConsumerInfo() { - return consumerInfo; - } - - public AdditionalMessageContent getHeader() { - return header; - } - - public String toString() { - return String.format("header: %s, consumer: %s, pld: %s", header, consumerInfo.getConsumerEprStr(), payload); - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java deleted file mode 100644 index cede65d..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.strategy.impl; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.airavata.wsmg.broker.ConsumerInfo; -import org.apache.airavata.wsmg.commons.OutGoingMessage; -import org.apache.airavata.wsmg.messenger.Deliverable; -import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Each subscriber (URL Endpoint) will have its own thread to send a message to - * - */ -public class ParallelSender implements SendingStrategy { - - private static final Logger log = LoggerFactory.getLogger(ParallelSender.class); - - private static final long TIME_TO_WAIT_FOR_SHUTDOWN_SECOND = 30; - - private HashMap<String, ConsumerHandler> activeConsumerHandlers = new HashMap<String, ConsumerHandler>(); - - private ExecutorService threadPool; - - public void init() { - this.threadPool = Executors.newCachedThreadPool(); - } - - public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) { - List<ConsumerInfo> consumerInfoList = outMessage.getConsumerInfoList(); - for (ConsumerInfo consumer : consumerInfoList) { - sendToConsumerHandler(consumer, outMessage, deliverable); - } - } - - public void shutdown() { - log.debug("Shutting down"); - - threadPool.shutdown(); - try { - threadPool.awaitTermination(TIME_TO_WAIT_FOR_SHUTDOWN_SECOND, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - log.error("Interrupted while waiting thread pool to shutdown"); - } - log.debug("Shut down"); - } - - private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) { - String consumerUrl = consumer.getConsumerEprStr(); - - LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(), - message.getAdditionalMessageContent()); - - synchronized (activeConsumerHandlers) { - ConsumerHandler handler = activeConsumerHandlers.get(consumerUrl); - if (handler == null) { - handler = new ParallelConsumerHandler(consumerUrl, deliverable); - activeConsumerHandlers.put(consumerUrl, handler); - handler.submitMessage(lwm); - threadPool.submit(handler); - } else { - handler.submitMessage(lwm); - } - } - } - - class ParallelConsumerHandler extends ConsumerHandler { - - private static final int MAX_UNSUCCESSFUL_DRAINS = 3; - private static final int SLEEP_TIME_SECONDS = 1; - private int numberOfUnsuccessfulDrain = 0; - - public ParallelConsumerHandler(String url, Deliverable deliverable) { - super(url, deliverable); - } - - public void run() { - log.debug(String.format("ParallelConsumerHandler starting: %s", getConsumerUrl())); - - ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>(); - while (true) { - - /* - * Try to find more message to send out - */ - if (queue.drainTo(localList) <= 0) { - numberOfUnsuccessfulDrain++; - } else { - numberOfUnsuccessfulDrain = 0; - } - - /* - * No new message for sometimes - */ - if (numberOfUnsuccessfulDrain >= MAX_UNSUCCESSFUL_DRAINS) { - /* - * Stop this thread if and only if there is no message - */ - synchronized (activeConsumerHandlers) { - if (queue.size() == 0) { - if (activeConsumerHandlers.remove(getConsumerUrl()) != null) { - log.debug(String.format("Consumer handler is already removed: %s", getConsumerUrl())); - } - log.debug(String.format("ParallelConsumerHandler done: %s,", getConsumerUrl())); - break; - } - } - } - - send(localList); - localList.clear(); - - if (numberOfUnsuccessfulDrain > 0) { - waitForMessages(); - } - } - } - - private void waitForMessages() { - try { - TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS); - log.debug("finished - waiting for messages"); - } catch (InterruptedException e) { - log.error("interrupted while waiting for messages", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java deleted file mode 100644 index 380e559..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.messenger.strategy.impl; - -import java.io.StringReader; -import java.util.List; - -import javax.xml.stream.XMLStreamException; - -import org.apache.airavata.wsmg.broker.AdditionalMessageContent; -import org.apache.airavata.wsmg.broker.ConsumerInfo; -import org.apache.airavata.wsmg.commons.CommonRoutines; -import org.apache.airavata.wsmg.commons.OutGoingMessage; -import org.apache.airavata.wsmg.messenger.Deliverable; -import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy; -import org.apache.axiom.om.OMElement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SerialSender implements SendingStrategy { - - private static final Logger log = LoggerFactory.getLogger(SerialSender.class); - - public void init() { - } - - public void shutdown() { - } - - public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) { - sendNotification(outMessage, deliverable); - } - - public void sendNotification(OutGoingMessage outGoingMessage, Deliverable deliverable) { - - if (outGoingMessage == null) { - log.error("Got a null outgoing message"); - return; - } - String messageString = outGoingMessage.getTextMessage(); - - List<ConsumerInfo> consumerInfoList = outGoingMessage.getConsumerInfoList(); - AdditionalMessageContent soapHeader = outGoingMessage.getAdditionalMessageContent(); - - try { - OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(messageString)); - - for (ConsumerInfo obj : consumerInfoList) { - deliverable.send(obj, messgae2Send, soapHeader); - } - - } catch (XMLStreamException e) { - log.error(e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java deleted file mode 100644 index 6400f63..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.util; - -import javax.xml.namespace.QName; - -import org.apache.airavata.wsmg.commons.WsmgCommonConstants; -import org.apache.axiom.om.OMAttribute; -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; - -public class BrokerUtil { - - /** - * Compares String {@code x} with String {@code y}. The result is {@code true} if and only if both arguments are - * {@code null} or String {@code x} has the same sequence of characters as String {@code y}. - * - * @param x - * @param y - * @return {@code true} if the String {@code x} and String {@code y} are equivalent, {@code false} otherwise - */ - public static boolean sameStringValue(String x, String y) { - return (x == null && y == null) || (x != null && y != null && x.equals(y)); - } - - public static String getTopicLocalString(String filterText) { - - if (filterText == null) - throw new IllegalArgumentException("filter text can't be null"); - - String localName = null; - - int pos = filterText.indexOf(':'); - - if (pos != -1) { - localName = filterText.substring(pos + 1); - - } else { - - localName = filterText; - } - - return localName; - } - - /** - * - * @return localString - * @throws AxisFault - */ - public static String getXPathString(OMElement xpathEl) throws AxisFault { - - if (xpathEl == null) { - throw new IllegalArgumentException("xpath element can't be null"); - } - - OMAttribute dialectAttribute = xpathEl.getAttribute(new QName("Dialect")); - - if (dialectAttribute == null) { - dialectAttribute = xpathEl.getAttribute(new QName("DIALECT")); - - } - if (dialectAttribute == null) { - throw new AxisFault("dialect is required for subscribe"); - } - String dialectString = dialectAttribute.getAttributeValue(); - if (!dialectString.equals(WsmgCommonConstants.XPATH_DIALECT)) { - // System.out.println("***Unkown dialect: " + dialectString); - throw new AxisFault("Unkown dialect: " + dialectString); - } - String xpathLocalString = xpathEl.getText(); - return xpathLocalString; - } - - public static String getTopicFromRequestPath(String topicPath) { - if (topicPath == null) - return null; - if (topicPath.length() == 0) - return null; - if (topicPath.startsWith("/")) { - topicPath = topicPath.substring(1); - if (topicPath.length() == 0) - return null; - } - - String ret = null; - - int index = topicPath.indexOf(WsmgCommonConstants.TOPIC_PREFIX); - if (index >= 0) { - - ret = topicPath.substring(index + WsmgCommonConstants.TOPIC_PREFIX.length()); - - if (ret.length() == 0) { - ret = null; - } - - } - - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java deleted file mode 100644 index bcd00a4..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.util; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -//Used for stress test. use together with TimerThread -public class Counter { - - private AtomicLong counter = new AtomicLong(0); - - private AtomicReference<String> otherStringValue = new AtomicReference<String>(); - - public void addCounter() { - counter.getAndIncrement(); - - } - - public synchronized void addCounter(String otherValue) { - counter.getAndIncrement(); - otherStringValue.set(otherValue); - } - - /** - * @return Returns the counterValue. - */ - public long getCounterValue() { - - return counter.get(); - } - - /** - * @param counterValue - * The counterValue to set. - */ - public void setCounterValue(long counterValue) { - counter.set(counterValue); - - } - - /** - * @return Returns the otherValueString. - */ - public String getOtherValueString() { - - return otherStringValue.get(); - } - - /** - * @param otherValueString - * The otherValueString to set. - */ - public void setOtherValueString(String otherValueString) { - otherStringValue.set(otherValueString); - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java deleted file mode 100644 index a2c0bfc..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.util; - -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.TreeSet; - -import org.apache.airavata.wsmg.commons.CommonRoutines; -import org.apache.airavata.wsmg.commons.WsmgVersion; - -public class RunTimeStatistics { - public static long totalMessageSize = 0; - public static long totalReceivedNotification = 0; - public static long totalSentOutNotification = 0; - public static long totalFailedNotification = 0; - public static long totalSubscriptions = 0; - public static long totalSubscriptionsAtStartUp = 0; - public static long totalUnSubscriptions = 0; - public static long minMessageSize = Long.MAX_VALUE; - public static long maxMessageSize = 0; - public static String startUpTime = ""; - public static long totalSuccessfulDeliveryTime = 0; - public static long totalFailedDeliveryTime = 0; - public static long minSuccessfulDeliveryTime = Long.MAX_VALUE; - public static long maxSuccessfulDeliveryTime = 0; - public static long minFailedDeliveryTime = Long.MAX_VALUE; - public static long maxFailedDeliveryTime = 0; - public static final HashMap<String, Integer> failConsumerList = new HashMap<String, Integer>(); - - // public static TreeSet currentBlackList=new TreeSet(); - // public static TreeSet previousBlackList=new TreeSet(); - - private static long startUpTimeInMillis; - - public static synchronized void addNewNotificationMessageSize(int size) { - if (size < minMessageSize) { - minMessageSize = size; - } - if (size > maxMessageSize) { - maxMessageSize = size; - } - totalMessageSize += size; - totalReceivedNotification++; - } - - public static synchronized void addNewSuccessfulDeliverTime(long deliveryTime) { - if (deliveryTime < minSuccessfulDeliveryTime) { - minSuccessfulDeliveryTime = deliveryTime; - } - if (deliveryTime > maxSuccessfulDeliveryTime) { - maxSuccessfulDeliveryTime = deliveryTime; - } - totalSuccessfulDeliveryTime += deliveryTime; - totalSentOutNotification++; - } - - public static synchronized void addNewFailedDeliverTime(long deliveryTime) { - if (deliveryTime < minFailedDeliveryTime) { - minFailedDeliveryTime = deliveryTime; - } - if (deliveryTime > maxFailedDeliveryTime) { - maxFailedDeliveryTime = deliveryTime; - } - totalFailedDeliveryTime += deliveryTime; - totalFailedNotification++; - } - - public static synchronized void addFailedConsumerURL(String url) { - Integer previousCount = failConsumerList.get(url); - if (previousCount == null) { - failConsumerList.put(url, 1); - } else { - previousCount++; - failConsumerList.put(url, previousCount); - } - } - - public static void setStartUpTime() { - Date currentDate = new Date(); // Current date - startUpTime = CommonRoutines.getXsdDateTime(currentDate); - startUpTimeInMillis = currentDate.getTime(); - } - - public static String getHtmlString() { - String htmlString = ""; - - htmlString += "<p>Total incoming message number: <span class=\"xml-requests-count\">" - + totalReceivedNotification + "</span><br />\n"; - htmlString += "Total successful outgoing message number: " + totalSentOutNotification + "<br>\n"; - htmlString += "Total unreachable outgoing message number: " + totalFailedNotification + "<br>\n"; - htmlString += "Total subscriptions requested: " + totalSubscriptions + "(+" + totalSubscriptionsAtStartUp - + " startUp)<br>\n"; - htmlString += "Total Unsubscriptions requested: " + totalUnSubscriptions + "<br>\n"; - htmlString += "</p>\n"; - int averageMessageSize = 0; - if (totalReceivedNotification != 0) { - averageMessageSize = (int) (totalMessageSize / totalReceivedNotification); - } - htmlString += "<p>Average message size: " + averageMessageSize + " bytes<br>\n"; - htmlString += "Max message size: " + maxMessageSize + " bytes<br>\n"; - htmlString += "Min message size: " + minMessageSize + " bytes<br>\n"; - htmlString += "</p>\n"; - long averageSuccessfulDeliveryTime = 0; - if (totalSuccessfulDeliveryTime != 0) { - averageSuccessfulDeliveryTime = (totalSuccessfulDeliveryTime / totalSentOutNotification); - } - htmlString += "<p>Average Successful Delivery Time: " + averageSuccessfulDeliveryTime + " ms<br>\n"; - htmlString += "Max Successful Delivery Time: " + maxSuccessfulDeliveryTime + " ms<br>\n"; - htmlString += "Min Successful Delivery Time: " + minSuccessfulDeliveryTime + " ms<br>\n"; - htmlString += "</p>\n"; - long averageFailedDeliveryTime = 0; - if (totalFailedDeliveryTime != 0) { - averageFailedDeliveryTime = (totalFailedDeliveryTime / totalFailedNotification); - } - htmlString += "<p>Average Unreachable Delivery Time: " + averageFailedDeliveryTime + " ms<br>\n"; - htmlString += "Max Unreachable Delivery Time: " + maxFailedDeliveryTime + " ms<br>\n"; - htmlString += "Min Unreachable Delivery Time: " + minFailedDeliveryTime + " ms<br>\n"; - htmlString += "</p>\n"; - htmlString += "<p>Service started at: " + startUpTime + " <span class=\"starttime-seconds\">" - + startUpTimeInMillis + "</span> [seconds] since UNIX epoch)" + "<br />\n"; - - htmlString += "Version: <span class=\"service-name\">" + WsmgVersion.getImplementationVersion() - + "</span></p>\n"; - - htmlString += "<p>Total unreachable consumerUrl: " + failConsumerList.size() + " <br>\n"; - TreeSet<String> consumerUrlList = new TreeSet<String>(failConsumerList.keySet()); - Iterator<String> iter = consumerUrlList.iterator(); - while (iter.hasNext()) { - String url = iter.next(); - int failedCount = failConsumerList.get(url); - htmlString += " " + url + " -->" + failedCount + " <br>\n"; - } - htmlString += "</p>\n"; - return htmlString; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java deleted file mode 100644 index 77f7c57..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.util; - -public class TimerThread implements Runnable { - Counter counter; - - long counterValue = 0; - - long seqNum = 0; - - String comment = ""; - - public TimerThread(Counter counter, String comment) { - this.counter = counter; - this.comment = comment; - } - - public void run() { - long currentTime = 0; - long interval = 1000; - long lastCounter = 0; - long idleCount = 0; - // wait for about 5 sec and start from 000 time so that other thread can - // start together - currentTime = System.currentTimeMillis(); - long launchTime = ((currentTime + 2000) / 1000) * 1000; - long sleepTime = launchTime - currentTime; - System.out.println("launchTime=" + launchTime + " SleepTime=" + sleepTime); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - while (true) { - currentTime = System.currentTimeMillis(); - counterValue = counter.getCounterValue(); - long receivedCount = counterValue - lastCounter; - lastCounter = counterValue; - if (receivedCount == 0) { - idleCount++; - } else { - idleCount = 0; - } - if (receivedCount > 0 || (receivedCount == 0 && idleCount < 3)) { - // System.out.println("time="+currentTime+" counter="+ - // counter.getCounterValue()+" - // received="+receivedCount+comment); - System.out.println(seqNum + " " + counter.getCounterValue() + " " + receivedCount + comment - + counter.getOtherValueString()); - } - seqNum++; - launchTime = launchTime + interval; - sleepTime = launchTime - currentTime; - // System.out.println("launchTime="+launchTime+" - // SleepTime="+sleepTime); - if (sleepTime < 0) - sleepTime = 0; - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java deleted file mode 100644 index 931a5bf..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.util; - -public enum WsEventingOperations { - - RENEW("renew"), PUBLISH("publish"), GET_STATUS("getStatus"), SUBSCRIPTION_END("subscriptionEnd"), SUBSCRIBE( - "subscribe"), UNSUBSCRIBE("unsubscribe"); - - private final String name; - - private WsEventingOperations(String n) { - name = n; - } - - public String toString() { - return name; - } - - public boolean equals(String s) { - return name.equals(s); - } - - public static WsEventingOperations valueFrom(String s) { - for (WsEventingOperations status : WsEventingOperations.values()) { - if (status.toString().equalsIgnoreCase(s)) { - return status; - } - - } - - throw new RuntimeException("invalid WsEventingOperation:- " + s); - - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java deleted file mode 100644 index c771134..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.util; - -public enum WsNotificationOperations { - - NOTIFY("notify"), SUBSCRIBE("subscribe"), GET_CURRENT_MSG("getCurrentMessage"), PAUSE_REQUEST("gause"), RESUME_REQUEST( - "resume"), PAUSE_SUBSCRIPTION("pauseSubscription"), RESUME_SUBSCRIPTION("resumeSubscription"), REGISTER_PUBLISHER( - "registerPublisher"), UNSUBSCRIBE("unsubscribe"); - - private final String name; - - private WsNotificationOperations(String n) { - name = n; - } - - public String toString() { - return name; - } - - public boolean equals(String s) { - return name.equals(s); - } - - public static WsNotificationOperations valueFrom(String s) { - for (WsNotificationOperations status : WsNotificationOperations.values()) { - if (status.toString().equalsIgnoreCase(s)) { - return status; - } - - } - - throw new RuntimeException("invalid Ws notification Operation:- " + s); - - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql deleted file mode 100755 index 5663ebf..0000000 --- a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -delete from disQ; -delete from MaxIDTable; -delete from MinIDTable; -delete from specialSubscription; -delete from subscription; -delete from msgbox; http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql deleted file mode 100644 index 80b51a4..0000000 --- a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -CREATE TABLE SUBSCRIPTION ( - SUBSCRIPTIONID VARCHAR(200) NOT NULL DEFAULT '', - TOPICS VARCHAR(255) DEFAULT '', - XPATH VARCHAR(200) DEFAULT '', - CONSUMERADDRESS VARCHAR(255) DEFAULT '', - REFERENCEPROPERTIES BLOB, - CONTENT BLOB, - WSRM INTEGER NOT NULL DEFAULT 0 , - CREATIONTIME TIMESTAMP NOT NULL - ); -CREATE TABLE SPECIALSUBSCRIPTION ( - SUBSCRIPTIONID VARCHAR(200) NOT NULL DEFAULT '', - TOPICS VARCHAR(255) DEFAULT '', - XPATH VARCHAR(200) DEFAULT '', - CONSUMERADDRESS VARCHAR(255) DEFAULT '', - REFERENCEPROPERTIES BLOB, - CONTENT BLOB, - WSRM INTEGER NOT NULL DEFAULT 0, - CREATIONTIME TIMESTAMP NOT NULL - ); - - -CREATE TABLE DISQ ( - ID BIGINT GENERATED BY DEFAULT AS IDENTITY, - TRACKID VARCHAR(100) DEFAULT NULL, - MESSAGE BLOB, - STATUS INTEGER DEFAULT NULL, - TOPIC VARCHAR(255) DEFAULT '', - PRIMARY KEY (ID) - ); - -CREATE TABLE MAXIDTABLE( - MAXID INTEGER - ); - -CREATE TABLE MINIDTABLE( - MINID INTEGER - ); - http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql deleted file mode 100755 index cb506ef..0000000 --- a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -CREATE DATABASE IF NOT EXISTS wsmg; -CREATE TABLE `subscription` ( - `SubscriptionId` varchar(200) NOT NULL default '', - `Topics` varchar(255) default '', - `XPath` varchar(200) default '', - `ConsumerAddress` varchar(255) default '', - `ReferenceProperties` blob, - `content` blob, - `wsrm` tinyint(1) NOT NULL default '0', - `CreationTime` datetime NOT NULL default '0000-00-00 00:00:00' - ); -CREATE TABLE `specialSubscription` ( - `SubscriptionId` varchar(200) NOT NULL default '', - `Topics` varchar(255) default '', - `XPath` varchar(200) default '', - `ConsumerAddress` varchar(255) default '', - `ReferenceProperties` blob, - `content` blob, - `wsrm` tinyint(1) NOT NULL default '0', - `CreationTime` datetime NOT NULL default '0000-00-00 00:00:00' - ); - - -CREATE TABLE `disQ` ( - `id` bigint(11) NOT NULL auto_increment, - `trackId` varchar(100) default NULL, - `message` longblob, - `status` int(11) default NULL, - `topic` varchar(255) default '', - PRIMARY KEY (`id`) - ); - -CREATE TABLE MaxIDTable( - maxID integer - ); - -CREATE TABLE MinIDTable( - minID integer - ); - http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/resources/services.xml ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/resources/services.xml b/modules/ws-messenger/messagebroker/src/main/resources/services.xml deleted file mode 100644 index 229262c..0000000 --- a/modules/ws-messenger/messagebroker/src/main/resources/services.xml +++ /dev/null @@ -1,125 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<serviceGroup> - <service name="EventingService" class="org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle"> - - <operation name="renew"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" /> - <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew - </actionMapping> - <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/RenewResponse - </outputActionMapping> - </operation> - - <operation name="getStatus"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" /> - <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus - </actionMapping> - <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse - </outputActionMapping> - </operation> - - <operation name="subscriptionEnd"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" /> - <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscriptionEnd - </actionMapping> - </operation> - - <operation name="subscribe"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" /> - <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe - </actionMapping> - <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse - </outputActionMapping> - </operation> - - <operation name="unsubscribe"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" /> - <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe - </actionMapping> - <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse - </outputActionMapping> - </operation> - - <operation name="publish"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingPublishMsgReceiver" /> - <actionMapping>http://org.apache.airavata/WseNotification - </actionMapping> - </operation> - - </service> - - <service name="NotificationService" class="org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle"> - - <operation name="notify"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" /> - <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/Notify - </actionMapping> - <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/NotifyResponse - </outputActionMapping> - </operation> - - <operation name="subscribe"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" /> - <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequest - </actionMapping> - <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequestResponse - </outputActionMapping> - </operation> - - <operation name="getCurrentMessage"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" /> - <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageRequest - </actionMapping> - <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageResponse - </outputActionMapping> - </operation> - - <operation name="pauseSubscription"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" /> - <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubsriptionRequest - </actionMapping> - <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubscriptionResponse - </outputActionMapping> - </operation> - - <operation name="resumeSubscription"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" /> - <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubsriptionRequest - </actionMapping> - <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubscriptionResponse - </outputActionMapping> - </operation> - - <operation name="unsubscribe"> - <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" /> - <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubsribeRequest - </actionMapping> - <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubscribeResponse - </outputActionMapping> - </operation> - - </service> - - <parameter name="configuration.file.name" locked="false">airavata-server.properties</parameter> - -</serviceGroup> http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java deleted file mode 100644 index 4867ba7..0000000 --- a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.io.IOException; -import java.util.Properties; - -import junit.framework.TestCase; - -import org.apache.airavata.wsmg.client.ConsumerNotificationHandler; -import org.apache.airavata.wsmg.client.WseMsgBrokerClient; -import org.apache.airavata.wsmg.util.TestUtilServer; -import org.apache.axiom.om.impl.llom.util.AXIOMUtil; -import org.apache.axiom.soap.SOAPEnvelope; -import org.apache.axis2.AxisFault; -import org.junit.Test; - -public class BrokerWSETest extends TestCase implements ConsumerNotificationHandler { - - private static int port = TestUtilServer.TESTING_PORT; - static Properties configs = new Properties(); - - public void handleNotification(SOAPEnvelope msgEnvelope) { - System.out.println("Received " + msgEnvelope); - } - - @Override - protected void setUp() throws Exception { - TestUtilServer.start(null, null); - } - - @Override - protected void tearDown() throws Exception { - TestUtilServer.stop(); - } - - @Test - public void testRoundTrip() throws InterruptedException { - - try { - - String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService"; - long value = System.currentTimeMillis(); - String msg = String.format("<msg> current time is : %d </msg>", value); - - WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient(); - wseMsgBrokerClient.init(brokerEPR); - int consumerPort = TestUtilServer.getAvailablePort(); - - String[] consumerEPRs = wseMsgBrokerClient.startConsumerService(consumerPort, this); - - assertTrue(consumerEPRs.length > 0); - - String topic = "WseRoundTripTestTopic"; - - String subscriptionID = wseMsgBrokerClient.subscribe(consumerEPRs[0], topic, null); - System.out.println("topic sub id = " + subscriptionID); - - try { - wseMsgBrokerClient.publish(topic, msg); - wseMsgBrokerClient.publish(topic, AXIOMUtil.stringToOM("<foo><bar>Test</bar></foo>")); - } catch (Exception e) { - fail(e.getMessage()); - } - - Thread.sleep(2000); - - try { - wseMsgBrokerClient.unSubscribe(subscriptionID); - } catch (AxisFault e) { - e.printStackTrace(); - fail(e.getMessage()); - } - wseMsgBrokerClient.shutdownConsumerService(); - - } catch (AxisFault e) { - e.printStackTrace(); - try { - System.in.read(); - } catch (IOException e1) { - e1.printStackTrace(); - } - fail("unexpected exception occured"); - } - System.out.println("Broker roundtrip done"); - - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java deleted file mode 100644 index baebd31..0000000 --- a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.io.IOException; -import java.util.Properties; - -import junit.framework.TestCase; - -import org.apache.airavata.wsmg.client.ConsumerNotificationHandler; -import org.apache.airavata.wsmg.client.WsntMsgBrokerClient; -import org.apache.airavata.wsmg.util.TestUtilServer; -import org.apache.axiom.om.impl.llom.util.AXIOMUtil; -import org.apache.axiom.soap.SOAPEnvelope; -import org.apache.axis2.AxisFault; -import org.junit.Test; - -public class BrokerWSNTTest extends TestCase implements ConsumerNotificationHandler { - - static Properties configs = new Properties(); - - public void handleNotification(SOAPEnvelope msgEnvelope) { - System.out.println("Received " + msgEnvelope); - } - - @Override - protected void setUp() throws Exception { - TestUtilServer.start(null, null); - } - - @Override - protected void tearDown() throws Exception { - TestUtilServer.stop(); - } - - @Test - public void testRoundTrip() throws InterruptedException { - - try { - long value = System.currentTimeMillis(); - String msg = String.format("<msg> current time is : %d </msg>", value); - - WsntMsgBrokerClient wsntMsgBrokerClient = new WsntMsgBrokerClient(); - - int consumerPort = 6767; - - String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/NotificationService"; - wsntMsgBrokerClient.init(brokerEPR); - String[] consumerEPRs = wsntMsgBrokerClient.startConsumerService(consumerPort, this); - - assertTrue(consumerEPRs.length > 0); - - String topic = "/WsntRoundTripTestTopic"; - - String topicSubscriptionID = wsntMsgBrokerClient.subscribe(consumerEPRs[0], topic, null); - System.out.println("topic subscription id: " + topicSubscriptionID); - - try { - wsntMsgBrokerClient.publish(topic, msg); - wsntMsgBrokerClient.publish(topic, AXIOMUtil.stringToOM("<foo><bar>Test</bar></foo>")); - } catch (Exception e) { - fail(e.getMessage()); - } - - Thread.sleep(2000); - - try { - wsntMsgBrokerClient.unSubscribe(topicSubscriptionID); - } catch (AxisFault e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - wsntMsgBrokerClient.shutdownConsumerService(); - - } catch (AxisFault e) { - e.printStackTrace(); - try { - System.in.read(); - } catch (IOException e1) { - e1.printStackTrace(); - } - - fail("unexpected exception occured"); - } - System.out.println("Broker roundtrip done"); - - } -}
