http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageBrokerServlet.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageBrokerServlet.java b/core/src/flex/messaging/MessageBrokerServlet.java new file mode 100644 index 0000000..0e8efb0 --- /dev/null +++ b/core/src/flex/messaging/MessageBrokerServlet.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import flex.management.MBeanLifecycleManager; +import flex.management.MBeanServerLocatorFactory; +import flex.messaging.config.ConfigurationManager; +import flex.messaging.config.FlexConfigurationManager; +import flex.messaging.config.MessagingConfiguration; +import flex.messaging.endpoints.Endpoint; +import flex.messaging.io.SerializationContext; +import flex.messaging.io.TypeMarshallingContext; +import flex.messaging.log.HTTPRequestLog; +import flex.messaging.log.Log; +import flex.messaging.log.LogCategories; +import flex.messaging.log.Logger; +import flex.messaging.log.LoggingHttpServletRequestWrapper; +import flex.messaging.log.ServletLogTarget; +import flex.messaging.services.AuthenticationService; +import flex.messaging.util.ClassUtil; +import flex.messaging.util.ExceptionUtil; +import flex.messaging.util.Trace; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.Principal; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The MessageBrokerServlet bootstraps the MessageBroker, + * adds endpoints to it, and starts the broker. The servlet + * also acts as a facade for all http-based endpoints, in that + * the servlet receives the http request and then delegates to + * an endpoint that can handle the request's content type. This + * does not occur for non-http endpoints, such as the rtmp endpoint. + * + * @see flex.messaging.MessageBroker + * + */ +public class MessageBrokerServlet extends HttpServlet +{ + static final long serialVersionUID = -5293855229461612246L; + + public static final String LOG_CATEGORY_STARTUP_BROKER = LogCategories.STARTUP_MESSAGEBROKER; + private static final String STRING_UNDEFINED_APPLICATION = "undefined"; + + private MessageBroker broker; + private HttpFlexSessionProvider httpFlexSessionProvider; + private static String FLEXDIR = "/WEB-INF/flex/"; + private boolean log_errors = false; + + /** + * Initializes the servlet in its web container, then creates + * the MessageBroker and adds Endpoints and Services to that broker. + * This servlet may keep a reference to an endpoint if it needs to + * delegate to it in the <code>service</code> method. + */ + public void init(ServletConfig servletConfig) throws ServletException + { + super.init(servletConfig); + + // allocate thread local variables + createThreadLocals(); + + // Set the servlet config as thread local + FlexContext.setThreadLocalObjects(null, null, null, null, null, servletConfig); + + ServletLogTarget.setServletContext(servletConfig.getServletContext()); + + ClassLoader loader = getClassLoader(); + + if ("true".equals(servletConfig.getInitParameter("useContextClassLoader"))) + { + loader = Thread.currentThread().getContextClassLoader(); + } + + // Should we wrap http request for later error logging? + log_errors = HTTPRequestLog.init(getServletContext()); + + // Start the broker + try + { + // Get the configuration manager + ConfigurationManager configManager = loadMessagingConfiguration(servletConfig); + + // Load configuration + MessagingConfiguration config = configManager.getMessagingConfiguration(servletConfig); + + // Set up logging system ahead of everything else. + config.createLogAndTargets(); + + // Create broker. + broker = config.createBroker(servletConfig.getInitParameter("messageBrokerId"), loader); + + // Set the servlet config as thread local + FlexContext.setThreadLocalObjects(null, null, broker, null, null, servletConfig); + + setupPathResolvers(); + + // Set initial servlet context on broker + broker.setServletContext(servletConfig.getServletContext()); + + Logger logger = Log.getLogger(ConfigurationManager.LOG_CATEGORY); + if (Log.isInfo()) + { + logger.info(VersionInfo.buildMessage()); + } + + // Create endpoints, services, security, and logger on the broker based on configuration + config.configureBroker(broker); + + long timeBeforeStartup = 0; + if (Log.isDebug()) + { + timeBeforeStartup = System.currentTimeMillis(); + Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is starting.", + new Object[]{broker.getId()}); + } + + //initialize the httpSessionToFlexSessionMap + synchronized(HttpFlexSession.mapLock) + { + if (servletConfig.getServletContext().getAttribute(HttpFlexSession.SESSION_MAP) == null) + servletConfig.getServletContext().setAttribute(HttpFlexSession.SESSION_MAP, new ConcurrentHashMap()); + } + + broker.start(); + + if (Log.isDebug()) + { + long timeAfterStartup = System.currentTimeMillis(); + Long diffMillis = timeAfterStartup - timeBeforeStartup; + Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is ready (startup time: '{1}' ms)", + new Object[]{broker.getId(), diffMillis}); + } + + // Report replaced tokens + configManager.reportTokens(); + + // Report any unused properties. + config.reportUnusedProperties(); + + // Setup provider for FlexSessions that wrap underlying J2EE HttpSessions. + httpFlexSessionProvider = new HttpFlexSessionProvider(); + broker.getFlexSessionManager().registerFlexSessionProvider(HttpFlexSession.class, httpFlexSessionProvider); + + // clear the broker and servlet config as this thread is done + FlexContext.clearThreadLocalObjects(); + } + catch (Throwable t) + { + // On any unhandled exception destroy the broker, log it and rethrow. + String applicationName = servletConfig.getServletContext().getServletContextName(); + if (applicationName == null) + applicationName = STRING_UNDEFINED_APPLICATION; + + System.err.println("**** MessageBrokerServlet in application '" + applicationName + + "' failed to initialize due to runtime exception: " + + ExceptionUtil.exceptionFollowedByRootCausesToString(t)); + destroy(); + // We used to throw UnavailableException, but Weblogic didn't mark the webapp as failed. See bug FBR-237 + throw new ServletException(t); + } + } + + private void setupPathResolvers() + { + setupExternalPathResolver(); + setupInternalPathResolver(); + } + + private void setupExternalPathResolver() + { + broker.setExternalPathResolver( + new MessageBroker.PathResolver() + { + public InputStream resolve(String filename) throws FileNotFoundException + { + return new FileInputStream(new File(filename)); + } + } + ); + } + + private void setupInternalPathResolver() + { + broker.setInternalPathResolver( + new MessageBroker.InternalPathResolver() + { + public InputStream resolve(String filename) + { + return getServletContext().getResourceAsStream(FLEXDIR + filename); + } + } + ); + } + + private static ConfigurationManager loadMessagingConfiguration(ServletConfig servletConfig) + { + ConfigurationManager manager = null; + Class managerClass; + String className; + + // Check for Custom Configuration Manager Specification + if (servletConfig != null) + { + String p = servletConfig.getInitParameter("services.configuration.manager"); + if (p != null) + { + className = p.trim(); + try + { + managerClass = ClassUtil.createClass(className); + manager = (ConfigurationManager)managerClass.newInstance(); + } + catch (Throwable t) + { + if (Trace.config) // Log is not initialized yet. + Trace.trace("Could not load configuration manager as: " + className); + } + } + } + + if (manager == null) + { + manager = new FlexConfigurationManager(); + } + + return manager; + } + + /** + * Stops all endpoints in the MessageBroker, giving them a chance + * to perform any endpoint-specific clean up. + */ + public void destroy() + { + if (broker != null) + { + broker.stop(); + if (broker.isManaged()) + { + MBeanLifecycleManager.unregisterRuntimeMBeans(broker); + } + // release static thread locals + destroyThreadLocals(); + } + } + + /** + * Handle an incoming request, and delegate to an endpoint based on + * content type, if appropriate. The content type mappings for endpoints + * are not externally configurable, and currently the AmfEndpoint + * is the only delegate. + */ + public void service(HttpServletRequest req, HttpServletResponse res) + { + if (log_errors) + { + // Create a wrapper for the request object so we can save the body content + LoggingHttpServletRequestWrapper wrapper = new LoggingHttpServletRequestWrapper(req); + req = wrapper; + + try + { + // Read the body content + wrapper.doReadBody(); + } + catch (IOException ignore) + { + // ignore, the wrapper will preserve what content we were able to read. + } + } + + try + { + // Update thread locals + broker.initThreadLocals(); + // Set this first so it is in place for the session creation event. The + // current session is set by the FlexSession stuff right when it is available. + // The threadlocal FlexClient is set up during message deserialization in the + // MessageBrokerFilter. + FlexContext.setThreadLocalObjects(null, null, broker, req, res, getServletConfig()); + + HttpFlexSession fs = httpFlexSessionProvider.getOrCreateSession(req); + Principal principal; + if(FlexContext.isPerClientAuthentication()) + { + principal = FlexContext.getUserPrincipal(); + } + else + { + principal = fs.getUserPrincipal(); + } + + if (principal == null && req.getHeader("Authorization") != null) + { + String encoded = req.getHeader("Authorization"); + if (encoded.indexOf("Basic") > -1) + { + encoded = encoded.substring(6); //Basic.length()+1 + try + { + ((AuthenticationService)broker.getService(AuthenticationService.ID)).decodeAndLogin(encoded, broker.getLoginManager()); + } + catch (Exception e) + { + if (Log.isDebug()) + Log.getLogger(LogCategories.SECURITY).info("Authentication service could not decode and login: " + e.getMessage()); + } + } + } + + String contextPath = req.getContextPath(); + String pathInfo = req.getPathInfo(); + String endpointPath = req.getServletPath(); + if (pathInfo != null) + endpointPath = endpointPath + pathInfo; + + Endpoint endpoint; + try + { + endpoint = broker.getEndpoint(endpointPath, contextPath); + } + catch (MessageException me) + { + if (Log.isInfo()) + Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Received invalid request for endpoint path '{0}'.", new Object[] {endpointPath}); + + if (!res.isCommitted()) + { + try + { + res.sendError(HttpServletResponse.SC_NOT_FOUND); + } + catch (IOException ignore) + {} + } + + return; + } + + try + { + if (Log.isInfo()) + { + Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request.", + new Object[] {endpoint.getId()}); + } + endpoint.service(req, res); + } + catch (UnsupportedOperationException ue) + { + if (Log.isInfo()) + { + Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request for an unsupported operation.", + new Object[] {endpoint.getId()}, + ue); + } + + if (!res.isCommitted()) + { + try + { + res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); + } + catch (IOException ignore) + {} + } + } + } + catch (Throwable t) + { + // Final resort catch block as recommended by Fortify as a potential System info leak + try + { + Log.getLogger(LogCategories.ENDPOINT_GENERAL).error("Unexpected error encountered in Message Broker servlet", t); + res.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + catch (IOException ignore) + { + // ignore + } + + } + finally + { + if (log_errors) + { + String info = (String) req.getAttribute(HTTPRequestLog.HTTP_ERROR_INFO); + if (info != null) + { + // Log the HttpRequest data + System.out.println("Exception occurred while processing HTTP request: " + info + ", request details logged in " + HTTPRequestLog.getFileName()); + HTTPRequestLog.outputRequest(info, req); + } + } + + FlexContext.clearThreadLocalObjects(); + } + } + + /** + * Hook for subclasses to override the class loader to use for loading user defined classes. + * + * @return the class loader for this class + */ + protected ClassLoader getClassLoader() + { + return this.getClass().getClassLoader(); + } + + + // Call ONLY on servlet startup + public static void createThreadLocals() + { + // allocate static thread local objects + FlexContext.createThreadLocalObjects(); + SerializationContext.createThreadLocalObjects(); + TypeMarshallingContext.createThreadLocalObjects(); + } + + + // Call ONLY on servlet shutdown + protected static void destroyThreadLocals() + { + // clear static member variables + Log.clear(); + MBeanServerLocatorFactory.clear(); + + // Destroy static thread local objects + FlexContext.releaseThreadLocalObjects(); + SerializationContext.releaseThreadLocalObjects(); + TypeMarshallingContext.releaseThreadLocalObjects(); + } + +}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageClient.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageClient.java b/core/src/flex/messaging/MessageClient.java new file mode 100644 index 0000000..56768e7 --- /dev/null +++ b/core/src/flex/messaging/MessageClient.java @@ -0,0 +1,1148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +import flex.messaging.client.FlexClient; +import flex.messaging.client.FlexClientOutboundQueueProcessor; +import flex.messaging.client.OutboundQueueThrottleManager; +import flex.messaging.config.ThrottleSettings; +import flex.messaging.config.ThrottleSettings.Policy; +import flex.messaging.log.LogCategories; +import flex.messaging.log.Log; +import flex.messaging.messages.AsyncMessage; +import flex.messaging.messages.CommandMessage; +import flex.messaging.messages.Message; +import flex.messaging.services.MessageService; +import flex.messaging.services.messaging.Subtopic; +import flex.messaging.services.messaging.selector.JMSSelector; +import flex.messaging.services.messaging.selector.JMSSelectorException; +import flex.messaging.util.ExceptionUtil; +import flex.messaging.util.TimeoutAbstractObject; +import flex.messaging.util.StringUtils; + +/** + * Represents a client-side MessageAgent instance. + * Currently a server-side MessageClient is only created if its client-side counterpart has subscribed + * to a destination for pushed data (e.g. Consumer). Client-side Producers do not result in the creation of + * corresponding server-side MessageClient instances. + * + * Client-side MessageAgents communicate with the server over a Channel that corresponds to a FlexSession. + * Server-side MessageClient instances are always created in the context of a FlexSession and when the FlexSession + * is invalidated any associated MessageClients are invalidated as well. + * + * MessageClients may also be timed out on a per-destination basis and this is based on subscription inactivity. + * If no messages are pushed to the MessageClient within the destination's subscription timeout period the + * MessageClient will be shutdown even if the associated FlexSession is still active and connected. + * Per-destination subscription timeout is an optional configuration setting, and should only be used when inactive + * subscriptions should be shut down opportunistically to preserve server resources. + */ +public class MessageClient extends TimeoutAbstractObject implements Serializable +{ + //-------------------------------------------------------------------------- + // + // Public Static Variables + // + //-------------------------------------------------------------------------- + + /** + * Log category for MessageClient related messages. + */ + public static final String MESSAGE_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_MESSAGECLIENT; + + //-------------------------------------------------------------------------- + // + // Static Constants + // + //-------------------------------------------------------------------------- + + /** + * Serializable to support broadcasting subscription state across the cluster for + * optimized message routing. + */ + static final long serialVersionUID = 3730240451524954453L; + + //-------------------------------------------------------------------------- + // + // Static Variables + // + //-------------------------------------------------------------------------- + + /** + * The list of MessageClient created listeners. + */ + private static final CopyOnWriteArrayList<MessageClientListener> createdListeners = new CopyOnWriteArrayList<MessageClientListener>(); + + //-------------------------------------------------------------------------- + // + // Static Methods + // + //-------------------------------------------------------------------------- + + /** + * Adds a MessageClient created listener. + * + * @see flex.messaging.MessageClientListener + * + * @param listener The listener to add. + */ + public static void addMessageClientCreatedListener(MessageClientListener listener) + { + if (listener != null) + createdListeners.addIfAbsent(listener); + } + + /** + * Removes a MessageClient created listener. + * + * @see flex.messaging.MessageClientListener + * + * @param listener The listener to remove. + */ + public static void removeMessageClientCreatedListener(MessageClientListener listener) + { + if (listener != null) + createdListeners.remove(listener); + } + + //-------------------------------------------------------------------------- + // + // Private Static Methods + // + //-------------------------------------------------------------------------- + + /** + * Utility method. + */ + private static boolean equalStrings(String a, String b) + { + return a == b || (a != null && a.equals(b)); + } + + /** + * Utility method. + */ + static int compareStrings(String a, String b) + { + if (a == b) + return 0; + + if (a != null && b != null) + return a.compareTo(b); + + if (a == null) + return -1; + + return 1; + } + + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * + * Constructs a new MessageClient for local use. + * + * @param clientId The clientId for the MessageClient. + * @param destination The destination the MessageClient is subscribed to. + * @param endpointId The Id of the endpoint this MessageClient subscription was created over. + */ + public MessageClient(Object clientId, Destination destination, String endpointId) + { + this(clientId, destination, endpointId, true); + } + + /** + * + * Constructs a new MessageClient. + * + * @param clientId The clientId for the MessageClient. + * @param destination The destination the MessageClient is subscribed to. + * @param endpointId The Id of the endpoint this MessageClient subscription was created over. + * @param useSession RemoteMessageClient instances should not be associated with a FlexSession (pass false). + */ + public MessageClient(Object clientId, Destination destination, String endpointId, boolean useSession) + { + valid = true; + this.clientId = clientId; + this.destination = destination; + this.endpointId = endpointId; + destinationId = destination.getId(); + updateLastUse(); // Initialize last use timestamp to construct time. + + /* If this is for a remote server, we do not associate with the session. */ + if (useSession) + { + flexSession = FlexContext.getFlexSession(); + flexSession.registerMessageClient(this); + + flexClient = FlexContext.getFlexClient(); + flexClient.registerMessageClient(this); + + // SubscriptionManager will notify the created listeners, once + // subscription state is setup completely. + // notifyCreatedListeners(); + } + else + { + flexClient = null; + flexSession = null; + // Use an instance level lock. + lock = new Object(); + // On a remote server we don't notify created listeners. + } + + if (Log.isDebug()) + Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient created with clientId '" + this.clientId + "' for destination '" + destinationId + "'."); + } + + //-------------------------------------------------------------------------- + // + // Variables + // + //-------------------------------------------------------------------------- + + /** + * This flag is set to true when the client channel that this subscription was + * established over is disconnected. + * It supports cleaning up per-endpoint outbound queues maintained by the FlexClient. + * If the client notifies the server that its channel is disconnecting, the FlexClient + * does not need to maintain an outbound queue containing a subscription invalidation + * message for this MessageClient to send to the client. + */ + private volatile boolean clientChannelDisconnected; + + /** + * The clientId for the MessageClient. + * This value is specified by the client directly or is autogenerated on the client. + */ + protected final Object clientId; + + /** + * Internal reference to the associated Destination; don't expose this in the public API. + */ + protected final Destination destination; + + /** + * The destination the MessageClient is subscribed to. + */ + protected final String destinationId; + + /** + * The set of session destroy listeners to notify when the session is destroyed. + */ + private transient volatile CopyOnWriteArrayList destroyedListeners; + + /** + * The Id for the endpoint this MessageClient subscription was created over. + */ + private String endpointId; + + /** + * The FlexClient associated with the MessageClient. + */ + private final transient FlexClient flexClient; + + /** + * The FlexSession associated with the MessageClient. + * Not final because this needs to be reset if the subscription fails over to a new endpoint. + */ + private transient FlexSession flexSession; + + /** + * Flag used to break cycles during invalidation. + */ + private boolean invalidating; + + /** + * The lock to use to guard all state changes for the MessageClient. + */ + protected Object lock = new Object(); + + /** + * Flag indicating whether the MessageClient is attempting to notify the remote client of + * its invalidation. + */ + private volatile boolean attemptingInvalidationClientNotification; + + /** + * A counter used to control invalidation for a MessageClient that has multiple + * subscriptions to its destination. + * Unsubscribing from one will not invalidate the MessageClient as long as other + * subscriptions remain active. + */ + private transient int numReferences; + + /** + * A set of all of the subscriptions managed by this message client. + */ + protected final Set<SubscriptionInfo> subscriptions = new CopyOnWriteArraySet<SubscriptionInfo>(); + + /** + * Flag indicating whether this client is valid or not. + */ + protected boolean valid; + + /** + * Flag that indicates whether the MessageClient has a per-destination subscription timeout. + * If false, the MessageClient will remain valid until its associated FlexSession is invalidated. + */ + private volatile boolean willTimeout; + + /** + * Has anyone explicitly registered this message client. This indicates that + * there is a reference to this MessageClient which is not an explicit subscription. + * This is a hook for FDMS and other adapters which want to use pushMessageToClients + * with clientIds but that do not want the subscription manager to manage subscriptions + * for them. + */ + private volatile boolean registered = false; + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Returns the clientId for the MessageClient. + * + * @return The clientId for the MessageClient. + */ + public Object getClientId() + { + return clientId; // Field is final; no need to sync. + } + + /** + * Returns the destination the MessageClient is subscribed to. + * + * @return The destination the MessageClient is subscribed to. + */ + public Destination getDestination() + { + return destination; // Field is final; no need to sync. + } + + /** + * Returns the id of the destination the MessageClient is subscribed to. + * + * @return The id of the destination the MessageClient is subscribed to. + */ + public String getDestinationId() + { + return destinationId; // Field is final; no need to sync. + } + + /** + * Returns the Id for the endpoint the MessageClient subscription was created over. + * + * @return The Id for the endpoint the MessageClient subscription was created over. + */ + public String getEndpointId() + { + return endpointId; // Field is final; no need to sync. + } + + /** + * Returns the FlexClient associated with this MessageClient. + * + * @return The FlexClient assocaited with this MessageClient. + */ + public FlexClient getFlexClient() + { + return flexClient; // Field is final; no need to sync. + } + + /** + * Returns the FlexSession associated with this MessageClient. + * + * @return The FlexSession associated with this MessageClient. + */ + public FlexSession getFlexSession() + { + synchronized (lock) + { + return flexSession; + } + } + + /** + * Returns the number of subscriptions associated with this MessageClient. + * + * @return The number of subscriptions associated with this MessageClient. + */ + public int getSubscriptionCount() + { + int count; + + synchronized (lock) + { + count = subscriptions != null? subscriptions.size() : 0; + } + + return count; + } + + /** + * + * This is used for FlexClient outbound queue management. When a MessageClient is invalidated + * if it is attempting to notify the client, then we must leave the outbound queue containing + * the notification in place. Otherwise, any messages queued for the subscription may be + * removed from the queue and possibly shut down immediately. + * + * @return true if the MessageClient is currently trying to notify the client about it's invalidation. + */ + public boolean isAttemptingInvalidationClientNotification() + { + return attemptingInvalidationClientNotification; + } + + /** + * + * This is set to true when the MessageClient is invalidated due to the client + * channel the subscription was established over disconnecting. + * It allows the FlexClient class to cleanup the outbound queue for the channel's + * corresponding server endpoint for the remote client, because we know that no + * currently queued messages need to be retained for delivery. + * + * @param value true if the MessageClient is invalidated due to the client being disconnected + */ + public void setClientChannelDisconnected(boolean value) + { + clientChannelDisconnected = value; + } + + /** + * @return true if the MessageClient is invalidated due to the client being disconnected + */ + public boolean isClientChannelDisconnected() + { + return clientChannelDisconnected; + } + + /** + * + * This is true when some code other than the SubscriptionManager + * is maintaining subscriptions for this message client. It ensures + * that we have this MessageClient kept around until the session + * expires. + */ + public void setRegistered(boolean reg) + { + registered = reg; + } + + /** + * + */ + public boolean isRegistered() + { + return registered; + } + + /** + * Adds a MessageClient destroy listener. + * + * @see flex.messaging.MessageClientListener + * + * @param listener The listener to add. + */ + public void addMessageClientDestroyedListener(MessageClientListener listener) + { + if (listener != null) + { + checkValid(); + + if (destroyedListeners == null) + { + synchronized (lock) + { + if (destroyedListeners == null) + destroyedListeners = new CopyOnWriteArrayList(); + } + } + + destroyedListeners.addIfAbsent(listener); + } + } + + /** + * Removes a MessageClient destroyed listener. + * + * @see flex.messaging.MessageClientListener + * + * @param listener The listener to remove. + */ + public void removeMessageClientDestroyedListener(MessageClientListener listener) + { + // No need to check validity; removing a listener is always ok. + if (listener != null && destroyedListeners != null) + destroyedListeners.remove(listener); + } + + /** + * + * Adds a subscription to the subscription set for this MessageClient. + * + * @param selector The selector expression used for the subscription. + * @param subtopic The subtopic used for the subscription. + * @param maxFrequency The maximum number of messages the client wants to + * receive per second (0 disables this limit). + */ + public void addSubscription(String selector, String subtopic, int maxFrequency) + { + synchronized (lock) + { + checkValid(); + + incrementReferences(); + + // Create and add the subscription to the subscriptions set. + SubscriptionInfo si = new SubscriptionInfo(selector, subtopic, maxFrequency); + subscriptions.add(si); + + registerSubscriptionWithThrottleManager(si); + } + } + + /** + * + * Registers the subscription with the outbound queue processor's throttle + * manager, if one exists. + * + * @param si The subscription info object. + */ + public void registerSubscriptionWithThrottleManager(SubscriptionInfo si) + { + // Register the destination that will setup client level outbound throttling. + ThrottleSettings ts = destination.getNetworkSettings().getThrottleSettings(); + if (ts.getOutboundPolicy() != Policy.NONE && (ts.isOutboundClientThrottleEnabled() || si.maxFrequency > 0)) + { + // Setup the client level outbound throttling, and register the destination + // only if the policy is not NONE, and a throttling limit is specified + // either at the destination or by consumer. + OutboundQueueThrottleManager throttleManager = getThrottleManager(true); + if (throttleManager != null) + throttleManager.registerDestination(destinationId, ts.getOutgoingClientFrequency(), ts.getOutboundPolicy()); + } + else if (si.maxFrequency > 0) // Let the client know that maxFrequency will be ignored. + { + if (Log.isWarn()) + Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).warn("MessageClient with clientId '" + + clientId + "' for destination '" + destinationId + + "' specified a maxFrequency value of '" + si.maxFrequency + + "' but the destination does not define a throttling policy. This value will be ignored."); + } + + // Now, register the subscription. + OutboundQueueThrottleManager throttleManager = getThrottleManager(false); + if (throttleManager != null) + throttleManager.registerSubscription(destinationId, si); + } + + /** + * + * Removes a subscription from the subscription set for this MessageClient. + * + * @param selector The selector expression for the subscription. + * @param subtopic The subtopic for the subscription. + * @return true if no subscriptions remain for this MessageClient; otherwise false. + */ + public boolean removeSubscription(String selector, String subtopic) + { + synchronized (lock) + { + SubscriptionInfo si = new SubscriptionInfo(selector, subtopic); + if (subscriptions.remove(si)) + { + unregisterSubscriptionWithThrottleManager(si); + return decrementReferences(); + } + else if (Log.isError()) + { + Log.getLogger(MessageService.LOG_CATEGORY).error("Error - unable to find subscription to remove for MessageClient: " + + clientId + " selector: " + selector + " subtopic: " + subtopic); + } + return numReferences == 0; + } + } + + /** + * + * We use the same MessageClient for more than one subscription with different + * selection criteria. This tracks the number of subscriptions that are active + * so that we know when we are finished. + */ + public void incrementReferences() + { + synchronized (lock) + { + numReferences++; + } + } + + /** + * + * Decrements the numReferences variable and returns true if this was the last reference. + */ + public boolean decrementReferences() + { + synchronized (lock) + { + if (--numReferences == 0) + { + cancelTimeout(); + if (destination instanceof MessageDestination) + { + MessageDestination msgDestination = (MessageDestination)destination; + if (msgDestination.getThrottleManager() != null) + msgDestination.getThrottleManager().removeClientThrottleMark(clientId); + } + return true; + } + return false; + } + } + + /** + * + * Invoked by SubscriptionManager once the subscription state is setup completely + * for the MessageClient.. + */ + public void notifyCreatedListeners() + { + // Notify MessageClient created listeners. + if (!createdListeners.isEmpty()) + { + // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. + for (Iterator iter = createdListeners.iterator(); iter.hasNext();) + ((MessageClientListener)iter.next()).messageClientCreated(this); + } + } + + /** + * + * Invoked by SubscriptionManager while handling a subscribe request. + * If the request is updating an existing subscription the 'push' state in the associated FlexClient + * may need to be updated to ensure that the correct endpoint is used for this subscription. + * + * @param newEndpointId The id for the new endpoint that the subscription may have failed over to. + */ + public void resetEndpoint(String newEndpointId) + { + String oldEndpointId = null; + FlexSession oldSession = null; + FlexSession newSession = FlexContext.getFlexSession(); + synchronized (lock) + { + // If anything is null, or nothing has changed, no need for a reset. + if (endpointId == null || newEndpointId == null || flexSession == null || newSession == null || (endpointId.equals(newEndpointId) && flexSession.equals(newSession))) + return; + + oldEndpointId = endpointId; + endpointId = newEndpointId; + + oldSession = flexSession; + flexSession = newSession; + } + + // Unregister in order to reset the proper push settings in the re-registration below once the session association has been patched. + if (flexClient != null) + flexClient.unregisterMessageClient(this); + + // Clear out any reference to this subscription that the previously associated session has. + if (oldSession != null) + oldSession.unregisterMessageClient(this); + + // Associate the current session with this subscription. + if (flexSession != null) + flexSession.registerMessageClient(this); + + // Reset proper push settings. + if (flexClient != null) + flexClient.registerMessageClient(this); + + if (Log.isDebug()) + { + String msg = "MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been reset as a result of a resubscribe."; + if (oldEndpointId != null && !oldEndpointId.equals(newEndpointId)) + msg += " Endpoint change [" + oldEndpointId + " -> " + newEndpointId + "]"; + if ((oldSession != null) && (newSession != null) && (oldSession != newSession)) // Test identity. + msg += " FlexSession change [" + oldSession.getClass().getName() + ":" + oldSession.getId() + " -> " + newSession.getClass().getName() + ":" + newSession.getId() + "]"; + + Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug(msg); + } + } + + /** + * + * Used to test whether this client should receive this message + * based on the list of subscriptions we have recorded for it. + * It must match both the subtopic and the selector expression. + * Usually this is done by the subscription manager - this logic is + * only here to maintain api compatibility with one of the variants + * of the pushMessageToClients which has subscriberIds and an evalSelector + * property. + * + * @param message The message to test. + */ + public boolean testMessage(Message message, MessageDestination destination) + { + String subtopic = (String) message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME); + String subtopicSeparator = destination.getServerSettings().getSubtopicSeparator(); + synchronized (lock) + { + for (SubscriptionInfo si : subscriptions) + { + if (si.matches(message, subtopic, subtopicSeparator)) + return true; + } + } + return false; + } + + /** + * Returns true if the MessageClient is valid; false if it has been invalidated. + * + * @return true if the MessageClient is valid; otherwise false. + */ + public boolean isValid() + { + synchronized (lock) + { + return valid; + } + } + + /** + * Invalidates the MessageClient. + */ + public void invalidate() + { + invalidate(false /* don't attempt to notify the client */); + } + + /** + * Invalidates the MessageClient, and optionally attempts to notify the client that + * this subscription has been invalidated. + * This overload is used when a subscription is timed out while the client is still + * actively connected to the server but should also be used by any custom code on the server + * that invalidates MessageClients but wishes to notify the client cleanly. + * + * @param notifyClient <code>true</code> to notify the client that its subscription has been + * invalidated. + */ + public void invalidate(boolean notifyClient) + { + synchronized (lock) + { + if (!valid || invalidating) + return; // Already shutting down. + + invalidating = true; // This thread gets to shut the MessageClient down. + cancelTimeout(); + } + + // Record whether we're attempting to notify the client or not. + attemptingInvalidationClientNotification = notifyClient; + + // Build a subscription invalidation message and push to the client if it is still valid. + if (notifyClient && flexClient != null && flexClient.isValid()) + { + CommandMessage msg = new CommandMessage(); + msg.setDestination(destination.getId()); + msg.setClientId(clientId); + msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION); + Set subscriberIds = new TreeSet(); + subscriberIds.add(clientId); + try + { + if (destination instanceof MessageDestination) + { + MessageDestination msgDestination = (MessageDestination)destination; + ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */); + } + } + catch (MessageException ignore) {} + } + + // Notify messageClientDestroyed listeners that we're being invalidated. + if (destroyedListeners != null && !destroyedListeners.isEmpty()) + { + for (Iterator iter = destroyedListeners.iterator(); iter.hasNext();) + { + ((MessageClientListener)iter.next()).messageClientDestroyed(this); + } + destroyedListeners.clear(); + } + + // And generate unsubscribe messages for all of the MessageClient's subscriptions and + // route them to the destination this MessageClient is subscribed to. + // The reason we send a message to the service rather than just going straight to the SubscriptionManager + // is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of + // things with our SubscriptionManager. + ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>(); + synchronized (lock) + { + for (SubscriptionInfo subInfo : subscriptions) + { + CommandMessage unsubMessage = new CommandMessage(); + unsubMessage.setDestination(destination.getId()); + unsubMessage.setClientId(clientId); + unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION); + unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE); + unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector); + unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic); + unsubMessages.add(unsubMessage); + } + } + // Release the lock and send the unsub messages. + for (CommandMessage unsubMessage : unsubMessages) + { + try + { + destination.getService().serviceCommand(unsubMessage); + } + catch (MessageException me) + { + if (Log.isDebug()) + Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me)); + } + } + + synchronized (lock) + { + // If we didn't clean up all subscriptions log an error and continue with shutdown. + int remainingSubscriptionCount = subscriptions.size(); + if (remainingSubscriptionCount > 0 && Log.isError()) + Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation"); + } + + // If someone registered this message client, invalidating it will free + // their reference which will typically also remove this message client. + if (registered && destination instanceof MessageDestination) + ((MessageDestination)destination).getSubscriptionManager().releaseMessageClient(this); + + synchronized (lock) + { + valid = false; + invalidating = false; + } + + if (Log.isDebug()) + Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated."); + } + + /** + * Pushes the supplied message and then invalidates the MessageClient. + * + * @param message The message to push to the client before invalidating. + * When message is null, MessageClient is invalidated silently. + */ + public void invalidate(Message message) + { + if (message != null) + { + message.setDestination(destination.getId()); + message.setClientId(clientId); + + Set subscriberIds = new TreeSet(); + subscriberIds.add(clientId); + try + { + if (destination instanceof MessageDestination) + { + MessageDestination msgDestination = (MessageDestination)destination; + ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, message, false /* don't eval selector */); + } + } + catch (MessageException ignore) {} + + invalidate(true /* attempt to notify remote client */); + } + else + { + invalidate(); + } + } + + /** + * + * Compares this MessageClient to the specified object. The result is true if + * the argument is not null and is a MessageClient instance with a matching + * clientId value. + * + * @param o The object to compare this MessageClient to. + * @return true if the MessageClient is equal; otherwise false. + */ + public boolean equals(Object o) + { + if (o instanceof MessageClient) + { + MessageClient c = (MessageClient) o; + if (c != null && c.getClientId().equals(clientId)) + return true; + } + return false; + } + + /** + * + * Returns the hash code for this MessageClient. The returned value is + * the hash code for the MessageClient's clientId property. + * + * @return The hash code value for this MessageClient. + */ + @Override + public int hashCode() + { + return getClientId().hashCode(); + } + + /** + * + * The String representation of this MessageClient is returned (its clientId value). + * + * @return The clientId value for this MessageClient. + */ + @Override + public String toString() + { + return String.valueOf(clientId); + } + + //---------------------------------- + // TimeoutAbstractObject overrides + //---------------------------------- + + /** + * + * Implements TimeoutCapable. + * This method returns the timeout value configured for the MessageClient's destination. + */ + @Override + public long getTimeoutPeriod() + { + return (destination instanceof MessageDestination) ? + ((MessageDestination)destination).getSubscriptionManager().getSubscriptionTimeoutMillis() : 0; + } + + /** + * + * Implements TimeoutCapable. + * This method is invoked when the MessageClient has timed out and it + * invalidates the MessageClient. + */ + public void timeout() + { + invalidate(true /* notify client */); + } + + /** + * + * Returns true if a timeout task is running for this MessageClient. + */ + public boolean isTimingOut() + { + return willTimeout; + } + + /** + * + * Records whether a timeout task is running for this MessageClient. + */ + public void setTimingOut(boolean value) + { + willTimeout = value; + } + + //-------------------------------------------------------------------------- + // + // Private Methods + // + //-------------------------------------------------------------------------- + + /** + * Utility method that tests validity and throws an exception if the instance + * has been invalidated. + */ + private void checkValid() + { + synchronized (lock) + { + if (!valid) + { + throw new RuntimeException("MessageClient has been invalidated."); // TODO - localize + } + } + } + + private OutboundQueueThrottleManager getThrottleManager(boolean create) + { + if (flexClient != null) + { + FlexClientOutboundQueueProcessor processor = flexClient.getOutboundQueueProcessor(endpointId); + if (processor != null) + return create? processor.getOrCreateOutboundQueueThrottleManager() : processor.getOutboundQueueThrottleManager(); + } + return null; + } + + private void unregisterSubscriptionWithThrottleManager(SubscriptionInfo si) + { + OutboundQueueThrottleManager throttleManager = getThrottleManager(false); + if (throttleManager != null) + throttleManager.unregisterSubscription(destinationId, si); + } + + //-------------------------------------------------------------------------- + // + // Nested Classes + // + //-------------------------------------------------------------------------- + + /** + * Represents a MessageClient's subscription to a destination. + * It captures the optional selector expression and subtopic for the + * subscription. + */ + public static class SubscriptionInfo implements Comparable + { + public String selector, subtopic; + public int maxFrequency; // maxFrequency per subscription. Not used in BlazeDS. + + public SubscriptionInfo(String sel, String sub) + { + this(sel, sub, 0); + } + + public SubscriptionInfo(String sel, String sub, int maxFrequency) + { + this.selector = sel; + this.subtopic = sub; + this.maxFrequency = maxFrequency; + } + + @Override + public boolean equals(Object o) + { + if (o instanceof SubscriptionInfo) + { + SubscriptionInfo other = (SubscriptionInfo) o; + return equalStrings(other.selector, selector) && + equalStrings(other.subtopic, subtopic); + } + return false; + } + + @Override + public int hashCode() + { + return (selector == null ? 0 : selector.hashCode()) + + (subtopic == null ? 1 : subtopic.hashCode()); + } + + /** + * Compares the two subscription infos (being careful to + * ensure we compare in a consistent way if the arguments + * are switched). + * @param o the object to compare + * @return int the compare result + */ + public int compareTo(Object o) + { + SubscriptionInfo other = (SubscriptionInfo) o; + int result; + + if ((result = compareStrings(other.selector, selector)) != 0) + return result; + else if ((result = compareStrings(other.subtopic, subtopic)) != 0) + return result; + + return 0; + } + + /** + * Check whether the message matches with selected subtopic. + * @param message current message + * @param subtopicToMatch subtopc string + * @param subtopicSeparator suptopic separator + * @return true if the message matches the subtopic + */ + public boolean matches(Message message, String subtopicToMatch, String subtopicSeparator) + { + if ((subtopicToMatch == null && subtopic != null) || (subtopicToMatch != null && subtopic == null)) + return false; // If either defines a subtopic, they both must define one. + + // If both define a subtopic, they must match. + if (subtopicToMatch != null && subtopic != null) + { + Subtopic consumerSubtopic = new Subtopic(subtopic, subtopicSeparator); + Subtopic messageSubtopic = new Subtopic(subtopicToMatch, subtopicSeparator); + if (!consumerSubtopic.matches(messageSubtopic)) + return false; // Not a match. + } + + if (selector == null) + return true; + + JMSSelector jmsSelector = new JMSSelector(selector); + try + { + if (jmsSelector.match(message)) + return true; + } + catch (JMSSelectorException jmse) + { + // Log a warning for this client's selector and continue + if (Log.isWarn()) + { + Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " + + jmse.toString() + StringUtils.NEWLINE + + " incomingMessage: " + message + StringUtils.NEWLINE + + " selector: " + selector + StringUtils.NEWLINE); + } + } + return false; + } + + /** + * Returns a String representation of the subscription info. + * @return String the string representation of the subscription info + */ + public String toString() + { + StringBuffer sb = new StringBuffer(); + sb.append("Subtopic: " + subtopic + StringUtils.NEWLINE); + sb.append("Selector: " + selector + StringUtils.NEWLINE); + if (maxFrequency > 0) + sb.append("maxFrequency: " + maxFrequency); + return sb.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageClientListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageClientListener.java b/core/src/flex/messaging/MessageClientListener.java new file mode 100644 index 0000000..7cfdb90 --- /dev/null +++ b/core/src/flex/messaging/MessageClientListener.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +/** + * Interface to be notified when a MessageClient is created or destroyed. Implementations of this interface + * may add themselves as listeners statically via <code>MessageClient.addMessageClientCreatedListener()</code>. + * To listen for MessageClient destruction, the implementation class instance must add itself as a listener to + * a specific MessageClient instance via the <code>addMessageClientDestroyedListener()</code> method. + */ +public interface MessageClientListener +{ + /** + * Notification that a MessageClient was created. + * + * @param messageClient The MessageClient that was created. + */ + void messageClientCreated(MessageClient messageClient); + + /** + * Notification that a MessageClient is about to be destroyed. + * + * @param messageClient The MessageClient that will be destroyed. + */ + void messageClientDestroyed(MessageClient messageClient); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageDestination.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageDestination.java b/core/src/flex/messaging/MessageDestination.java new file mode 100644 index 0000000..141ac8a --- /dev/null +++ b/core/src/flex/messaging/MessageDestination.java @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import flex.management.runtime.messaging.MessageDestinationControl; +import flex.management.runtime.messaging.services.messaging.SubscriptionManagerControl; +import flex.management.runtime.messaging.services.messaging.ThrottleManagerControl; +import flex.messaging.config.ConfigurationConstants; +import flex.messaging.config.ConfigurationException; +import flex.messaging.config.DestinationSettings; +import flex.messaging.config.ThrottleSettings; +import flex.messaging.config.ConfigMap; +import flex.messaging.config.NetworkSettings; +import flex.messaging.config.ServerSettings; +import flex.messaging.config.ThrottleSettings.Policy; +import flex.messaging.log.LogCategories; +import flex.messaging.services.MessageService; +import flex.messaging.services.Service; +import flex.messaging.services.messaging.SubscriptionManager; +import flex.messaging.services.messaging.RemoteSubscriptionManager; +import flex.messaging.services.messaging.ThrottleManager; +import flex.messaging.services.messaging.MessagingConstants; +import flex.messaging.util.ClassUtil; + +/** + * A logical reference to a MessageDestination. + */ +public class MessageDestination extends FactoryDestination +{ + static final long serialVersionUID = -2016911808141319012L; + + /** Log category for <code>MessageDestination</code>.*/ + public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE; + + // Errors + private static final int UNSUPPORTED_POLICY = 10124; + + // Destination properties + private transient ServerSettings serverSettings; + + // Destination internal + private transient SubscriptionManager subscriptionManager; + private transient RemoteSubscriptionManager remoteSubscriptionManager; + private transient ThrottleManager throttleManager; + + private transient MessageDestinationControl controller; + + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs an unmanaged <code>MessageDestination</code> instance. + */ + public MessageDestination() + { + this(false); + } + + /** + * Constructs a <code>MessageDestination</code> with the indicated management. + * + * @param enableManagement <code>true</code> if the <code>MessageDestination</code> + * is manageable; otherwise <code>false</code>. + */ + public MessageDestination(boolean enableManagement) + { + super(enableManagement); + + serverSettings = new ServerSettings(); + + // Managers + subscriptionManager = new SubscriptionManager(this); + remoteSubscriptionManager = new RemoteSubscriptionManager(this); + } + + //-------------------------------------------------------------------------- + // + // Initialize, validate, start, and stop methods. + // + //-------------------------------------------------------------------------- + + /** + * Initializes the <code>MessageDestination</code> with the properties. + * If subclasses override, they must call <code>super.initialize()</code>. + * + * @param id The id of the destination. + * @param properties Properties for the <code>MessageDestination</code>. + */ + @Override + public void initialize(String id, ConfigMap properties) + { + super.initialize(id, properties); + + if (properties == null || properties.size() == 0) + return; + + // Network properties + network(properties); + + // Server properties + server(properties); + } + + /** + * Sets up the throttle manager before it starts. + */ + @Override + public void start() + { + // Create the throttle manager, only if needed. + if (networkSettings.getThrottleSettings() != null) + { + ThrottleSettings settings = networkSettings.getThrottleSettings(); + if (settings.isClientThrottleEnabled() || settings.isDestinationThrottleEnabled()) + { + settings.setDestinationName(getId()); + throttleManager = createThrottleManager(); + throttleManager.setThrottleSettings(settings); + throttleManager.start(); + } + } + super.start(); + } + + /** + * Stops the subscription, remote subscription, and throttle managers and + * then calls super class's stop. + */ + @Override + public void stop() + { + if (isStarted()) + { + subscriptionManager.stop(); + remoteSubscriptionManager.stop(); + if (throttleManager != null) + throttleManager.stop(); + } + super.stop(); + } + + //-------------------------------------------------------------------------- + // + // Public Getters and Setters for Destination properties + // + //-------------------------------------------------------------------------- + + /** + * Sets the <code>NetworkSettings</code> of the <code>MessageDestination</code>. + * + * @param networkSettings The <code>NetworkSettings</code> of the <code>MessageDestination</code> + */ + @Override + public void setNetworkSettings(NetworkSettings networkSettings) + { + super.setNetworkSettings(networkSettings); + + // Set the subscription manager settings if needed. + if (networkSettings.getSubscriptionTimeoutMinutes() > 0) + { + long subscriptionTimeoutMillis = networkSettings.getSubscriptionTimeoutMinutes() * 60L * 1000L; // Convert to millis. + subscriptionManager.setSubscriptionTimeoutMillis(subscriptionTimeoutMillis); + } + } + + /** + * Returns the <code>ServerSettings</code> of the <code>MessageDestination</code>. + * + * @return The <code>ServerSettings</code> of the <code>MessageDestination</code>. + */ + public ServerSettings getServerSettings() + { + return serverSettings; + } + + /** + * Sets the <code>ServerSettings</code> of the <code>MessageDestination</code>. + * + * @param serverSettings The <code>ServerSettings</code> of the <code>MessageDestination</code> + */ + public void setServerSettings(ServerSettings serverSettings) + { + this.serverSettings = serverSettings; + } + + /** + * Casts the <code>Service</code> into <code>MessageService</code> + * and calls super.setService. + * + * @param service The <code>Service</code> managing this <code>Destination</code>. + */ + @Override + public void setService(Service service) + { + MessageService messageService = (MessageService)service; + super.setService(messageService); + } + + //-------------------------------------------------------------------------- + // + // Other Public Methods + // + //-------------------------------------------------------------------------- + + /** + * + * Returns a <tt>ConfigMap</tt> of destination properties that the client + * needs. This includes properties from <code>super{@link #describeDestination(boolean)}</code> + * and it also includes outbound throttling policy that the edge server might need. + * + * @param onlyReliable Determines whether only reliable destination configuration should be returned. + * @return A <tt>ConfigMap</tt> of destination properties that the client needs. + */ + @Override + public ConfigMap describeDestination(boolean onlyReliable) + { + ConfigMap destinationConfig = super.describeDestination(onlyReliable); + if (destinationConfig == null) + return null; + + if (throttleManager == null) + return destinationConfig; + + Policy outboundPolicy = throttleManager.getOutboundPolicy(); + if (outboundPolicy == null || outboundPolicy == Policy.NONE) + return destinationConfig; + + // Add the outbound throttle policy to network properties section as appropriate. + ConfigMap properties = destinationConfig.getPropertyAsMap(ConfigurationConstants.PROPERTIES_ELEMENT, null); + if (properties == null) + { + properties = new ConfigMap(); + destinationConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties); + } + + ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null); + if (network == null) + { + network = new ConfigMap(); + properties.addProperty(NetworkSettings.NETWORK_ELEMENT, network); + } + + ConfigMap throttleOutbound = new ConfigMap(); + throttleOutbound.addProperty(ThrottleSettings.ELEMENT_POLICY, throttleManager.getOutboundPolicy().toString()); + network.addProperty(ThrottleSettings.ELEMENT_OUTBOUND, throttleOutbound); + + return destinationConfig; + } + + + public SubscriptionManager getSubscriptionManager() + { + return subscriptionManager; + } + + + public RemoteSubscriptionManager getRemoteSubscriptionManager() + { + return remoteSubscriptionManager; + } + + + public ThrottleManager getThrottleManager() + { + return throttleManager; + } + + + @Override + public boolean equals(Object o) + { + if (o instanceof Destination) + { + Destination d = (Destination)o; + String serviceType1 = d.getServiceType(); + String serviceType2 = getServiceType(); + if ((serviceType1 == null && serviceType2 == null) || (serviceType1 != null && serviceType1.equals(serviceType2))) + { + String id1 = d.getId(); + String id2 = getId(); + if ((id1 == null && id2 == null) || (id1 != null && id1.equals(id2))) + return true; + } + } + return false; + } + + + @Override + public int hashCode() + { + return (getServiceType() == null ? 0 : getServiceType().hashCode()) * 100003 + + (getId() == null ? 0 : getId().hashCode()); + } + + + @Override + public String toString() + { + return getServiceType() + "#" + getId(); + } + + //-------------------------------------------------------------------------- + // + // Protected/Private Methods + // + //-------------------------------------------------------------------------- + + protected ThrottleManager createThrottleManager() + { + Service service = getService(); + if (service == null || service.getMessageBroker() == null) + return new ThrottleManager(); // Return the default. + + try + { + Class<? extends ThrottleManager> throttleManagerClass = service.getMessageBroker().getThrottleManagerClass(); + Object instance = ClassUtil.createDefaultInstance(throttleManagerClass, null); + if (instance instanceof ThrottleManager) + return (ThrottleManager)instance; + } + catch (Throwable t) + { + // NOWARN + } + + return new ThrottleManager(); // Return the default. + } + + protected void network(ConfigMap properties) + { + ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null); + if (network == null) + return; + + // Get implementation specific network settings, including subclasses! + NetworkSettings ns = getNetworkSettings(); + + // Subscriber timeout; first check for subscription-timeout-minutes and fallback to legacy session-timeout. + int useLegacyPropertyToken = -999999; + int subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SUBSCRIPTION_TIMEOUT_MINUTES, useLegacyPropertyToken); + if (subscriptionTimeoutMinutes == useLegacyPropertyToken) + subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SESSION_TIMEOUT, NetworkSettings.DEFAULT_TIMEOUT); + ns.setSubscriptionTimeoutMinutes(subscriptionTimeoutMinutes); + + // Throttle Settings + ThrottleSettings ts = ns.getThrottleSettings(); + ts.setDestinationName(getId()); + throttle(ts, network); + + setNetworkSettings(ns); + } + + protected void throttle(ThrottleSettings ts, ConfigMap network) + { + ConfigMap inbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_INBOUND, null); + if (inbound != null) + { + ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(inbound); + ts.setInboundPolicy(policy); + int destFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0); + ts.setIncomingDestinationFrequency(destFreq); + int clientFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0); + ts.setIncomingClientFrequency(clientFreq); + } + + ConfigMap outbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_OUTBOUND, null); + if (outbound != null) + { + ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(outbound); + ts.setOutboundPolicy(policy); + int destFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0); + ts.setOutgoingDestinationFrequency(destFreq); + int clientFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0); + ts.setOutgoingClientFrequency(clientFreq); + } + } + + private ThrottleSettings.Policy getPolicyFromThrottleSettings(ConfigMap settings) + { + String policyString = settings.getPropertyAsString(ThrottleSettings.ELEMENT_POLICY, null); + ThrottleSettings.Policy policy = ThrottleSettings.Policy.NONE; + if (policyString == null) + return policy; + try + { + policy = ThrottleSettings.parsePolicy(policyString); + } + catch (ConfigurationException exception) + { + ConfigurationException ce = new ConfigurationException(); + ce.setMessage(UNSUPPORTED_POLICY, new Object[] {getId(), policyString}); + throw ce; + } + return policy; + } + + protected void server(ConfigMap properties) + { + ConfigMap server = properties.getPropertyAsMap(DestinationSettings.SERVER_ELEMENT, null); + if (server == null) + return; + + long ttl = server.getPropertyAsLong(MessagingConstants.TIME_TO_LIVE_ELEMENT, -1); + serverSettings.setMessageTTL(ttl); + + boolean durable = server.getPropertyAsBoolean(MessagingConstants.IS_DURABLE_ELEMENT, false); + serverSettings.setDurable(durable); + + boolean allowSubtopics = server.getPropertyAsBoolean(MessagingConstants.ALLOW_SUBTOPICS_ELEMENT, false); + serverSettings.setAllowSubtopics(allowSubtopics); + + boolean disallowWildcardSubtopics = server.getPropertyAsBoolean(MessagingConstants.DISALLOW_WILDCARD_SUBTOPICS_ELEMENT, false); + serverSettings.setDisallowWildcardSubtopics(disallowWildcardSubtopics); + + int priority = server.getPropertyAsInt(MessagingConstants.MESSAGE_PRIORITY, -1); + if (priority != -1) + serverSettings.setPriority(priority); + + String subtopicSeparator = server.getPropertyAsString(MessagingConstants.SUBTOPIC_SEPARATOR_ELEMENT, MessagingConstants.DEFAULT_SUBTOPIC_SEPARATOR); + serverSettings.setSubtopicSeparator(subtopicSeparator); + + String routingMode = server.getPropertyAsString(MessagingConstants.CLUSTER_MESSAGE_ROUTING, "server-to-server"); + serverSettings.setBroadcastRoutingMode(routingMode); + } + + /** + * Returns the log category of the <code>MessageDestination</code>. + * + * @return The log category of the component. + */ + @Override + protected String getLogCategory() + { + return LOG_CATEGORY; + } + + /** + * Invoked automatically to allow the <code>MessageDestination</code> to setup its corresponding + * MBean control. + * + * @param service The <code>Service</code> that manages this <code>MessageDestination</code>. + */ + @Override + protected void setupDestinationControl(Service service) + { + controller = new MessageDestinationControl(this, service.getControl()); + controller.register(); + setControl(controller); + setupThrottleManagerControl(controller); + setupSubscriptionManagerControl(controller); + } + + protected void setupThrottleManagerControl(MessageDestinationControl destinationControl) + { + if (throttleManager != null) + { + ThrottleManagerControl throttleManagerControl = new ThrottleManagerControl(throttleManager, destinationControl); + throttleManagerControl.register(); + throttleManager.setControl(throttleManagerControl); + throttleManager.setManaged(true); + destinationControl.setThrottleManager(throttleManagerControl.getObjectName()); + } + } + + private void setupSubscriptionManagerControl(MessageDestinationControl destinationControl) + { + SubscriptionManagerControl subscriptionManagerControl = new SubscriptionManagerControl(getSubscriptionManager(), destinationControl); + subscriptionManagerControl.register(); + getSubscriptionManager().setControl(subscriptionManagerControl); + getSubscriptionManager().setManaged(true); + destinationControl.setSubscriptionManager(subscriptionManagerControl.getObjectName()); + } +}
