Hi Pavel, On 05/08/2011 08:33 PM, [email protected] wrote: > Hallo, > I have a problem to compile the following code using gt4.2.0. This code > works in gt4.0.8. The problem is caused by method setUseNotify and > setTopicExpression from class Subscribe which are not presented in > gt4.2.0. Could you please help me to work around this problem? > Thank you, very much.
please find attached code I use for consuming notifications in GT4.2.x. The data types are custom and not included, but you should get the general idea of how it works with GT4.2.x. Cheers, Roland -- Roland Kübert Höchstleistungsrechenzentrum Stuttgart Universität Stuttgart Nobelstraße 19 70569 Stuttgart E-Mail: [email protected] Phone: +49 711 685 65802 Skype: rkuebert
/* $Id: MonitorListener.java 10 2011-01-24 14:59:21Z rkuebert $ */ /* Copyright 2010 University of Stuttgart Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package de.hlrs.isoni.monitor.client; import java.util.List; import javax.xml.namespace.QName; import javax.xml.rpc.ServiceException; import org.apache.axis.message.MessageElement; import org.apache.axis.types.URI.MalformedURIException; import org.globus.axis.message.addressing.Address; import org.globus.axis.message.addressing.EndpointReferenceType; import org.globus.wsrf.NotificationConsumerManager; import org.globus.wsrf.NotifyCallback; import org.globus.wsrf.WSNConstants; import org.globus.wsrf.encoding.DeserializationException; import org.globus.wsrf.encoding.ObjectDeserializer; import org.globus.wsrf.encoding.ObjectSerializer; import org.oasis.wsn.FilterType; import org.oasis.wsn.NotificationMessageHolderTypeMessage; import org.oasis.wsn.NotificationProducer; import org.oasis.wsn.Subscribe; import org.oasis.wsn.SubscribeResponse; import org.oasis.wsn.TopicExpressionType; import org.oasis.wsn.WSBaseNotificationServiceAddressingLocator; import de.uni_stuttgart.rus.irmos.Monitoring.ISONI_Domain_Monitoring_Report; import de.uni_stuttgart.rus.irmos.Notification.SLA_Notification_Event; /** * Listener client for the ISONI Monitoring service. * <p/> * Subscribes for notifications and prints out delivered notifications. * * @author roland */ public class MonitorListener implements NotifyCallback { /** * Prints delivered notifications to stdout. * * @param topicPath * @param producder the endpoint reference of the notification producer * @param message the notification message */ @SuppressWarnings("unchecked") public void deliver(List topicPath, EndpointReferenceType producer, Object message) { System.out.println("deliver() called..."); //printTopicPath(topicPath); String topic = getTopic(topicPath); if (topic == null) { System.out.println("Could not obtain topic, not treating message any further"); return; } if (! (message instanceof NotificationMessageHolderTypeMessage)) { System.err.println("Non-WS-N message received, ignoring it"); return; } NotificationMessageHolderTypeMessage holder = (NotificationMessageHolderTypeMessage) message; try { MessageElement messageContent = holder.get_any()[0]; if (topic.endsWith("-MON")) { System.out.println("Received monitoring report"); ISONI_Domain_Monitoring_Report report = (ISONI_Domain_Monitoring_Report) ObjectDeserializer.toObject( messageContent, ISONI_Domain_Monitoring_Report.class); System.out.println("Got report for VSN: " + report.getOWL_VSN_Individual_ID()); } else if (topic.endsWith("-SLA")) { System.out.println("Received sla notification event"); SLA_Notification_Event slaEvent = (SLA_Notification_Event) ObjectDeserializer.toObject( messageContent, SLA_Notification_Event.class); System.out.println("Got event for VSN: " + slaEvent.getOWL_VSN_Individual_ID()); } else { System.out.println("Unknown message received, not handling it"); } } catch (DeserializationException e) { System.err.println("Could not extract monitoring report/SLA notification event from WS-Notification message"); } } @SuppressWarnings("unchecked") private String getTopic(List topicPath) { QName topic = null; if (topicPath.size() != 1) { System.err.println("Topic path should contain exactly one topic"); } else { System.err.println("Trying to get topic from topicPath"); try { topic = (QName) topicPath.get(0); } catch (Exception exception ){ exception.printStackTrace(); } } return topic.toString(); } /** * Prints a complete topic path. * * @param topicPath the list of topic paths. */ @SuppressWarnings({ "unused", "unchecked" }) private void printTopicPath(List topicPath) { for (int i = 0; i < topicPath.size(); i++) { System.out.println("topicPath[" + i + "]: " + topicPath.get(i)); } } /** * Subscribes to the service at <code>serviceURI</code> and listens * for notifications. * * @param serviceURI the URI of the monitoring service */ public void run(String serviceURI, String slaId) { System.out.println("Subscribing to service at " + serviceURI); try { // The NotificationConsumerManager sets up an endpoint where // notifications will be delivered. NotificationConsumerManager consumer; consumer = NotificationConsumerManager.getInstance(); consumer.startListening(); System.out.println("Consumer URL: " + consumer.getURL()); EndpointReferenceType consumerEpr = consumer .createNotificationConsumer(this); // Create topic QName monitoringTopic = new QName( "http://www.hlrs.de/namespaces/services/isoni/Monitor", slaId + "-MON"); QName slaEventTopic = new QName( "http://www.hlrs.de/namespaces/services/isoni/Monitor", slaId + "-SLA"); // Create the request to the remote Subscribe() call Subscribe monitoringRequest = getSubscribeRequestForTopic(consumerEpr, monitoringTopic); Subscribe slaEventRequest = getSubscribeRequestForTopic(consumerEpr, slaEventTopic); // Get a reference to the NotificationProducer portType NotificationProducer producerPort = getNotificationProducerPort(serviceURI); // Start the ball rolling... SubscribeResponse subscribeResponse = producerPort .subscribe(monitoringRequest); System.out.println("Subscribed to topic '" + subscribeResponse + "'; repsonse: " + subscribeResponse.getSubscriptionReference()); subscribeResponse = producerPort .subscribe(slaEventRequest); System.out.println("Subscribed to topic '" + subscribeResponse + "'; repsonse: " + subscribeResponse.getSubscriptionReference()); // Loop forever and print notifications. System.out.println("Waiting for notification. Ctrl-C to end."); while (true) { try { Thread.sleep(30000); } catch (Exception e) { System.out.println("Interrupted while sleeping."); } } } catch (Exception e) { e.printStackTrace(); } } private Subscribe getSubscribeRequestForTopic(EndpointReferenceType consumerEpr, QName topic) { // Create the request to the remote Subscribe() call Subscribe request = new Subscribe(); // Must the notification be delivered using the Notify operation? // request.setUseNotify(Boolean.TRUE); // Indicate what the client's EPR is try { request.setConsumerReference(consumerEpr); TopicExpressionType topicExpression = new TopicExpressionType(); topicExpression.setDialect(WSNConstants.SIMPLE_TOPIC_DIALECT); topicExpression.setValue(topic); MessageElement message = (MessageElement) ObjectSerializer .toSOAPElement(topicExpression, WSNConstants.TOPIC_EXPRESSION); FilterType filter = new FilterType(); filter.set_any(new MessageElement[] { message }); request.setFilter(filter); } catch (Exception e) { System.err.println("Error building subscribe request for topic " + topic.toString()); } return request; } /** * Creates a notification producer port for the given URI and returns it. * * @param serviceUri * the URI for which to create the notification producer * @return the created notification producer * @throws MalformedURIException * if the URI is not well-formed * @throws ServiceException * if any error on the service side occurs */ private NotificationProducer getNotificationProducerPort(String serviceUri) throws MalformedURIException, ServiceException { WSBaseNotificationServiceAddressingLocator notifLocator = new WSBaseNotificationServiceAddressingLocator(); EndpointReferenceType endpoint = new EndpointReferenceType(); endpoint.setAddress(new Address(serviceUri)); NotificationProducer producerPort = notifLocator .getNotificationProducerPort(endpoint); return producerPort; } /** * Main method - creates a new <code>MonitorListener</code> and * runs it. * * @param args command line arguments */ public static void main(String[] args) { MonitorListener client = new MonitorListener(); if ( args.length != 2 ) { usage(); } client.run(args[0], args[1]); } /** * Prints out usage information and leaves the program. */ private static void usage() { System.err.println("Usage:\njava MonitorListener URI TOPIC"); System.exit(0); } }
