http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageBrokerServlet.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageBrokerServlet.java b/core/src/flex/messaging/MessageBrokerServlet.java deleted file mode 100644 index 0e8efb0..0000000 --- a/core/src/flex/messaging/MessageBrokerServlet.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -import flex.management.MBeanLifecycleManager; -import flex.management.MBeanServerLocatorFactory; -import flex.messaging.config.ConfigurationManager; -import flex.messaging.config.FlexConfigurationManager; -import flex.messaging.config.MessagingConfiguration; -import flex.messaging.endpoints.Endpoint; -import flex.messaging.io.SerializationContext; -import flex.messaging.io.TypeMarshallingContext; -import flex.messaging.log.HTTPRequestLog; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.log.Logger; -import flex.messaging.log.LoggingHttpServletRequestWrapper; -import flex.messaging.log.ServletLogTarget; -import flex.messaging.services.AuthenticationService; -import flex.messaging.util.ClassUtil; -import flex.messaging.util.ExceptionUtil; -import flex.messaging.util.Trace; - -import javax.servlet.ServletConfig; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.security.Principal; -import java.util.concurrent.ConcurrentHashMap; - -/** - * The MessageBrokerServlet bootstraps the MessageBroker, - * adds endpoints to it, and starts the broker. The servlet - * also acts as a facade for all http-based endpoints, in that - * the servlet receives the http request and then delegates to - * an endpoint that can handle the request's content type. This - * does not occur for non-http endpoints, such as the rtmp endpoint. - * - * @see flex.messaging.MessageBroker - * - */ -public class MessageBrokerServlet extends HttpServlet -{ - static final long serialVersionUID = -5293855229461612246L; - - public static final String LOG_CATEGORY_STARTUP_BROKER = LogCategories.STARTUP_MESSAGEBROKER; - private static final String STRING_UNDEFINED_APPLICATION = "undefined"; - - private MessageBroker broker; - private HttpFlexSessionProvider httpFlexSessionProvider; - private static String FLEXDIR = "/WEB-INF/flex/"; - private boolean log_errors = false; - - /** - * Initializes the servlet in its web container, then creates - * the MessageBroker and adds Endpoints and Services to that broker. - * This servlet may keep a reference to an endpoint if it needs to - * delegate to it in the <code>service</code> method. - */ - public void init(ServletConfig servletConfig) throws ServletException - { - super.init(servletConfig); - - // allocate thread local variables - createThreadLocals(); - - // Set the servlet config as thread local - FlexContext.setThreadLocalObjects(null, null, null, null, null, servletConfig); - - ServletLogTarget.setServletContext(servletConfig.getServletContext()); - - ClassLoader loader = getClassLoader(); - - if ("true".equals(servletConfig.getInitParameter("useContextClassLoader"))) - { - loader = Thread.currentThread().getContextClassLoader(); - } - - // Should we wrap http request for later error logging? - log_errors = HTTPRequestLog.init(getServletContext()); - - // Start the broker - try - { - // Get the configuration manager - ConfigurationManager configManager = loadMessagingConfiguration(servletConfig); - - // Load configuration - MessagingConfiguration config = configManager.getMessagingConfiguration(servletConfig); - - // Set up logging system ahead of everything else. - config.createLogAndTargets(); - - // Create broker. - broker = config.createBroker(servletConfig.getInitParameter("messageBrokerId"), loader); - - // Set the servlet config as thread local - FlexContext.setThreadLocalObjects(null, null, broker, null, null, servletConfig); - - setupPathResolvers(); - - // Set initial servlet context on broker - broker.setServletContext(servletConfig.getServletContext()); - - Logger logger = Log.getLogger(ConfigurationManager.LOG_CATEGORY); - if (Log.isInfo()) - { - logger.info(VersionInfo.buildMessage()); - } - - // Create endpoints, services, security, and logger on the broker based on configuration - config.configureBroker(broker); - - long timeBeforeStartup = 0; - if (Log.isDebug()) - { - timeBeforeStartup = System.currentTimeMillis(); - Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is starting.", - new Object[]{broker.getId()}); - } - - //initialize the httpSessionToFlexSessionMap - synchronized(HttpFlexSession.mapLock) - { - if (servletConfig.getServletContext().getAttribute(HttpFlexSession.SESSION_MAP) == null) - servletConfig.getServletContext().setAttribute(HttpFlexSession.SESSION_MAP, new ConcurrentHashMap()); - } - - broker.start(); - - if (Log.isDebug()) - { - long timeAfterStartup = System.currentTimeMillis(); - Long diffMillis = timeAfterStartup - timeBeforeStartup; - Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is ready (startup time: '{1}' ms)", - new Object[]{broker.getId(), diffMillis}); - } - - // Report replaced tokens - configManager.reportTokens(); - - // Report any unused properties. - config.reportUnusedProperties(); - - // Setup provider for FlexSessions that wrap underlying J2EE HttpSessions. - httpFlexSessionProvider = new HttpFlexSessionProvider(); - broker.getFlexSessionManager().registerFlexSessionProvider(HttpFlexSession.class, httpFlexSessionProvider); - - // clear the broker and servlet config as this thread is done - FlexContext.clearThreadLocalObjects(); - } - catch (Throwable t) - { - // On any unhandled exception destroy the broker, log it and rethrow. - String applicationName = servletConfig.getServletContext().getServletContextName(); - if (applicationName == null) - applicationName = STRING_UNDEFINED_APPLICATION; - - System.err.println("**** MessageBrokerServlet in application '" + applicationName - + "' failed to initialize due to runtime exception: " - + ExceptionUtil.exceptionFollowedByRootCausesToString(t)); - destroy(); - // We used to throw UnavailableException, but Weblogic didn't mark the webapp as failed. See bug FBR-237 - throw new ServletException(t); - } - } - - private void setupPathResolvers() - { - setupExternalPathResolver(); - setupInternalPathResolver(); - } - - private void setupExternalPathResolver() - { - broker.setExternalPathResolver( - new MessageBroker.PathResolver() - { - public InputStream resolve(String filename) throws FileNotFoundException - { - return new FileInputStream(new File(filename)); - } - } - ); - } - - private void setupInternalPathResolver() - { - broker.setInternalPathResolver( - new MessageBroker.InternalPathResolver() - { - public InputStream resolve(String filename) - { - return getServletContext().getResourceAsStream(FLEXDIR + filename); - } - } - ); - } - - private static ConfigurationManager loadMessagingConfiguration(ServletConfig servletConfig) - { - ConfigurationManager manager = null; - Class managerClass; - String className; - - // Check for Custom Configuration Manager Specification - if (servletConfig != null) - { - String p = servletConfig.getInitParameter("services.configuration.manager"); - if (p != null) - { - className = p.trim(); - try - { - managerClass = ClassUtil.createClass(className); - manager = (ConfigurationManager)managerClass.newInstance(); - } - catch (Throwable t) - { - if (Trace.config) // Log is not initialized yet. - Trace.trace("Could not load configuration manager as: " + className); - } - } - } - - if (manager == null) - { - manager = new FlexConfigurationManager(); - } - - return manager; - } - - /** - * Stops all endpoints in the MessageBroker, giving them a chance - * to perform any endpoint-specific clean up. - */ - public void destroy() - { - if (broker != null) - { - broker.stop(); - if (broker.isManaged()) - { - MBeanLifecycleManager.unregisterRuntimeMBeans(broker); - } - // release static thread locals - destroyThreadLocals(); - } - } - - /** - * Handle an incoming request, and delegate to an endpoint based on - * content type, if appropriate. The content type mappings for endpoints - * are not externally configurable, and currently the AmfEndpoint - * is the only delegate. - */ - public void service(HttpServletRequest req, HttpServletResponse res) - { - if (log_errors) - { - // Create a wrapper for the request object so we can save the body content - LoggingHttpServletRequestWrapper wrapper = new LoggingHttpServletRequestWrapper(req); - req = wrapper; - - try - { - // Read the body content - wrapper.doReadBody(); - } - catch (IOException ignore) - { - // ignore, the wrapper will preserve what content we were able to read. - } - } - - try - { - // Update thread locals - broker.initThreadLocals(); - // Set this first so it is in place for the session creation event. The - // current session is set by the FlexSession stuff right when it is available. - // The threadlocal FlexClient is set up during message deserialization in the - // MessageBrokerFilter. - FlexContext.setThreadLocalObjects(null, null, broker, req, res, getServletConfig()); - - HttpFlexSession fs = httpFlexSessionProvider.getOrCreateSession(req); - Principal principal; - if(FlexContext.isPerClientAuthentication()) - { - principal = FlexContext.getUserPrincipal(); - } - else - { - principal = fs.getUserPrincipal(); - } - - if (principal == null && req.getHeader("Authorization") != null) - { - String encoded = req.getHeader("Authorization"); - if (encoded.indexOf("Basic") > -1) - { - encoded = encoded.substring(6); //Basic.length()+1 - try - { - ((AuthenticationService)broker.getService(AuthenticationService.ID)).decodeAndLogin(encoded, broker.getLoginManager()); - } - catch (Exception e) - { - if (Log.isDebug()) - Log.getLogger(LogCategories.SECURITY).info("Authentication service could not decode and login: " + e.getMessage()); - } - } - } - - String contextPath = req.getContextPath(); - String pathInfo = req.getPathInfo(); - String endpointPath = req.getServletPath(); - if (pathInfo != null) - endpointPath = endpointPath + pathInfo; - - Endpoint endpoint; - try - { - endpoint = broker.getEndpoint(endpointPath, contextPath); - } - catch (MessageException me) - { - if (Log.isInfo()) - Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Received invalid request for endpoint path '{0}'.", new Object[] {endpointPath}); - - if (!res.isCommitted()) - { - try - { - res.sendError(HttpServletResponse.SC_NOT_FOUND); - } - catch (IOException ignore) - {} - } - - return; - } - - try - { - if (Log.isInfo()) - { - Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request.", - new Object[] {endpoint.getId()}); - } - endpoint.service(req, res); - } - catch (UnsupportedOperationException ue) - { - if (Log.isInfo()) - { - Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request for an unsupported operation.", - new Object[] {endpoint.getId()}, - ue); - } - - if (!res.isCommitted()) - { - try - { - res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); - } - catch (IOException ignore) - {} - } - } - } - catch (Throwable t) - { - // Final resort catch block as recommended by Fortify as a potential System info leak - try - { - Log.getLogger(LogCategories.ENDPOINT_GENERAL).error("Unexpected error encountered in Message Broker servlet", t); - res.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - } - catch (IOException ignore) - { - // ignore - } - - } - finally - { - if (log_errors) - { - String info = (String) req.getAttribute(HTTPRequestLog.HTTP_ERROR_INFO); - if (info != null) - { - // Log the HttpRequest data - System.out.println("Exception occurred while processing HTTP request: " + info + ", request details logged in " + HTTPRequestLog.getFileName()); - HTTPRequestLog.outputRequest(info, req); - } - } - - FlexContext.clearThreadLocalObjects(); - } - } - - /** - * Hook for subclasses to override the class loader to use for loading user defined classes. - * - * @return the class loader for this class - */ - protected ClassLoader getClassLoader() - { - return this.getClass().getClassLoader(); - } - - - // Call ONLY on servlet startup - public static void createThreadLocals() - { - // allocate static thread local objects - FlexContext.createThreadLocalObjects(); - SerializationContext.createThreadLocalObjects(); - TypeMarshallingContext.createThreadLocalObjects(); - } - - - // Call ONLY on servlet shutdown - protected static void destroyThreadLocals() - { - // clear static member variables - Log.clear(); - MBeanServerLocatorFactory.clear(); - - // Destroy static thread local objects - FlexContext.releaseThreadLocalObjects(); - SerializationContext.releaseThreadLocalObjects(); - TypeMarshallingContext.releaseThreadLocalObjects(); - } - -}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageClient.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageClient.java b/core/src/flex/messaging/MessageClient.java deleted file mode 100644 index 56768e7..0000000 --- a/core/src/flex/messaging/MessageClient.java +++ /dev/null @@ -1,1148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; - -import flex.messaging.client.FlexClient; -import flex.messaging.client.FlexClientOutboundQueueProcessor; -import flex.messaging.client.OutboundQueueThrottleManager; -import flex.messaging.config.ThrottleSettings; -import flex.messaging.config.ThrottleSettings.Policy; -import flex.messaging.log.LogCategories; -import flex.messaging.log.Log; -import flex.messaging.messages.AsyncMessage; -import flex.messaging.messages.CommandMessage; -import flex.messaging.messages.Message; -import flex.messaging.services.MessageService; -import flex.messaging.services.messaging.Subtopic; -import flex.messaging.services.messaging.selector.JMSSelector; -import flex.messaging.services.messaging.selector.JMSSelectorException; -import flex.messaging.util.ExceptionUtil; -import flex.messaging.util.TimeoutAbstractObject; -import flex.messaging.util.StringUtils; - -/** - * Represents a client-side MessageAgent instance. - * Currently a server-side MessageClient is only created if its client-side counterpart has subscribed - * to a destination for pushed data (e.g. Consumer). Client-side Producers do not result in the creation of - * corresponding server-side MessageClient instances. - * - * Client-side MessageAgents communicate with the server over a Channel that corresponds to a FlexSession. - * Server-side MessageClient instances are always created in the context of a FlexSession and when the FlexSession - * is invalidated any associated MessageClients are invalidated as well. - * - * MessageClients may also be timed out on a per-destination basis and this is based on subscription inactivity. - * If no messages are pushed to the MessageClient within the destination's subscription timeout period the - * MessageClient will be shutdown even if the associated FlexSession is still active and connected. - * Per-destination subscription timeout is an optional configuration setting, and should only be used when inactive - * subscriptions should be shut down opportunistically to preserve server resources. - */ -public class MessageClient extends TimeoutAbstractObject implements Serializable -{ - //-------------------------------------------------------------------------- - // - // Public Static Variables - // - //-------------------------------------------------------------------------- - - /** - * Log category for MessageClient related messages. - */ - public static final String MESSAGE_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_MESSAGECLIENT; - - //-------------------------------------------------------------------------- - // - // Static Constants - // - //-------------------------------------------------------------------------- - - /** - * Serializable to support broadcasting subscription state across the cluster for - * optimized message routing. - */ - static final long serialVersionUID = 3730240451524954453L; - - //-------------------------------------------------------------------------- - // - // Static Variables - // - //-------------------------------------------------------------------------- - - /** - * The list of MessageClient created listeners. - */ - private static final CopyOnWriteArrayList<MessageClientListener> createdListeners = new CopyOnWriteArrayList<MessageClientListener>(); - - //-------------------------------------------------------------------------- - // - // Static Methods - // - //-------------------------------------------------------------------------- - - /** - * Adds a MessageClient created listener. - * - * @see flex.messaging.MessageClientListener - * - * @param listener The listener to add. - */ - public static void addMessageClientCreatedListener(MessageClientListener listener) - { - if (listener != null) - createdListeners.addIfAbsent(listener); - } - - /** - * Removes a MessageClient created listener. - * - * @see flex.messaging.MessageClientListener - * - * @param listener The listener to remove. - */ - public static void removeMessageClientCreatedListener(MessageClientListener listener) - { - if (listener != null) - createdListeners.remove(listener); - } - - //-------------------------------------------------------------------------- - // - // Private Static Methods - // - //-------------------------------------------------------------------------- - - /** - * Utility method. - */ - private static boolean equalStrings(String a, String b) - { - return a == b || (a != null && a.equals(b)); - } - - /** - * Utility method. - */ - static int compareStrings(String a, String b) - { - if (a == b) - return 0; - - if (a != null && b != null) - return a.compareTo(b); - - if (a == null) - return -1; - - return 1; - } - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * - * Constructs a new MessageClient for local use. - * - * @param clientId The clientId for the MessageClient. - * @param destination The destination the MessageClient is subscribed to. - * @param endpointId The Id of the endpoint this MessageClient subscription was created over. - */ - public MessageClient(Object clientId, Destination destination, String endpointId) - { - this(clientId, destination, endpointId, true); - } - - /** - * - * Constructs a new MessageClient. - * - * @param clientId The clientId for the MessageClient. - * @param destination The destination the MessageClient is subscribed to. - * @param endpointId The Id of the endpoint this MessageClient subscription was created over. - * @param useSession RemoteMessageClient instances should not be associated with a FlexSession (pass false). - */ - public MessageClient(Object clientId, Destination destination, String endpointId, boolean useSession) - { - valid = true; - this.clientId = clientId; - this.destination = destination; - this.endpointId = endpointId; - destinationId = destination.getId(); - updateLastUse(); // Initialize last use timestamp to construct time. - - /* If this is for a remote server, we do not associate with the session. */ - if (useSession) - { - flexSession = FlexContext.getFlexSession(); - flexSession.registerMessageClient(this); - - flexClient = FlexContext.getFlexClient(); - flexClient.registerMessageClient(this); - - // SubscriptionManager will notify the created listeners, once - // subscription state is setup completely. - // notifyCreatedListeners(); - } - else - { - flexClient = null; - flexSession = null; - // Use an instance level lock. - lock = new Object(); - // On a remote server we don't notify created listeners. - } - - if (Log.isDebug()) - Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient created with clientId '" + this.clientId + "' for destination '" + destinationId + "'."); - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * This flag is set to true when the client channel that this subscription was - * established over is disconnected. - * It supports cleaning up per-endpoint outbound queues maintained by the FlexClient. - * If the client notifies the server that its channel is disconnecting, the FlexClient - * does not need to maintain an outbound queue containing a subscription invalidation - * message for this MessageClient to send to the client. - */ - private volatile boolean clientChannelDisconnected; - - /** - * The clientId for the MessageClient. - * This value is specified by the client directly or is autogenerated on the client. - */ - protected final Object clientId; - - /** - * Internal reference to the associated Destination; don't expose this in the public API. - */ - protected final Destination destination; - - /** - * The destination the MessageClient is subscribed to. - */ - protected final String destinationId; - - /** - * The set of session destroy listeners to notify when the session is destroyed. - */ - private transient volatile CopyOnWriteArrayList destroyedListeners; - - /** - * The Id for the endpoint this MessageClient subscription was created over. - */ - private String endpointId; - - /** - * The FlexClient associated with the MessageClient. - */ - private final transient FlexClient flexClient; - - /** - * The FlexSession associated with the MessageClient. - * Not final because this needs to be reset if the subscription fails over to a new endpoint. - */ - private transient FlexSession flexSession; - - /** - * Flag used to break cycles during invalidation. - */ - private boolean invalidating; - - /** - * The lock to use to guard all state changes for the MessageClient. - */ - protected Object lock = new Object(); - - /** - * Flag indicating whether the MessageClient is attempting to notify the remote client of - * its invalidation. - */ - private volatile boolean attemptingInvalidationClientNotification; - - /** - * A counter used to control invalidation for a MessageClient that has multiple - * subscriptions to its destination. - * Unsubscribing from one will not invalidate the MessageClient as long as other - * subscriptions remain active. - */ - private transient int numReferences; - - /** - * A set of all of the subscriptions managed by this message client. - */ - protected final Set<SubscriptionInfo> subscriptions = new CopyOnWriteArraySet<SubscriptionInfo>(); - - /** - * Flag indicating whether this client is valid or not. - */ - protected boolean valid; - - /** - * Flag that indicates whether the MessageClient has a per-destination subscription timeout. - * If false, the MessageClient will remain valid until its associated FlexSession is invalidated. - */ - private volatile boolean willTimeout; - - /** - * Has anyone explicitly registered this message client. This indicates that - * there is a reference to this MessageClient which is not an explicit subscription. - * This is a hook for FDMS and other adapters which want to use pushMessageToClients - * with clientIds but that do not want the subscription manager to manage subscriptions - * for them. - */ - private volatile boolean registered = false; - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Returns the clientId for the MessageClient. - * - * @return The clientId for the MessageClient. - */ - public Object getClientId() - { - return clientId; // Field is final; no need to sync. - } - - /** - * Returns the destination the MessageClient is subscribed to. - * - * @return The destination the MessageClient is subscribed to. - */ - public Destination getDestination() - { - return destination; // Field is final; no need to sync. - } - - /** - * Returns the id of the destination the MessageClient is subscribed to. - * - * @return The id of the destination the MessageClient is subscribed to. - */ - public String getDestinationId() - { - return destinationId; // Field is final; no need to sync. - } - - /** - * Returns the Id for the endpoint the MessageClient subscription was created over. - * - * @return The Id for the endpoint the MessageClient subscription was created over. - */ - public String getEndpointId() - { - return endpointId; // Field is final; no need to sync. - } - - /** - * Returns the FlexClient associated with this MessageClient. - * - * @return The FlexClient assocaited with this MessageClient. - */ - public FlexClient getFlexClient() - { - return flexClient; // Field is final; no need to sync. - } - - /** - * Returns the FlexSession associated with this MessageClient. - * - * @return The FlexSession associated with this MessageClient. - */ - public FlexSession getFlexSession() - { - synchronized (lock) - { - return flexSession; - } - } - - /** - * Returns the number of subscriptions associated with this MessageClient. - * - * @return The number of subscriptions associated with this MessageClient. - */ - public int getSubscriptionCount() - { - int count; - - synchronized (lock) - { - count = subscriptions != null? subscriptions.size() : 0; - } - - return count; - } - - /** - * - * This is used for FlexClient outbound queue management. When a MessageClient is invalidated - * if it is attempting to notify the client, then we must leave the outbound queue containing - * the notification in place. Otherwise, any messages queued for the subscription may be - * removed from the queue and possibly shut down immediately. - * - * @return true if the MessageClient is currently trying to notify the client about it's invalidation. - */ - public boolean isAttemptingInvalidationClientNotification() - { - return attemptingInvalidationClientNotification; - } - - /** - * - * This is set to true when the MessageClient is invalidated due to the client - * channel the subscription was established over disconnecting. - * It allows the FlexClient class to cleanup the outbound queue for the channel's - * corresponding server endpoint for the remote client, because we know that no - * currently queued messages need to be retained for delivery. - * - * @param value true if the MessageClient is invalidated due to the client being disconnected - */ - public void setClientChannelDisconnected(boolean value) - { - clientChannelDisconnected = value; - } - - /** - * @return true if the MessageClient is invalidated due to the client being disconnected - */ - public boolean isClientChannelDisconnected() - { - return clientChannelDisconnected; - } - - /** - * - * This is true when some code other than the SubscriptionManager - * is maintaining subscriptions for this message client. It ensures - * that we have this MessageClient kept around until the session - * expires. - */ - public void setRegistered(boolean reg) - { - registered = reg; - } - - /** - * - */ - public boolean isRegistered() - { - return registered; - } - - /** - * Adds a MessageClient destroy listener. - * - * @see flex.messaging.MessageClientListener - * - * @param listener The listener to add. - */ - public void addMessageClientDestroyedListener(MessageClientListener listener) - { - if (listener != null) - { - checkValid(); - - if (destroyedListeners == null) - { - synchronized (lock) - { - if (destroyedListeners == null) - destroyedListeners = new CopyOnWriteArrayList(); - } - } - - destroyedListeners.addIfAbsent(listener); - } - } - - /** - * Removes a MessageClient destroyed listener. - * - * @see flex.messaging.MessageClientListener - * - * @param listener The listener to remove. - */ - public void removeMessageClientDestroyedListener(MessageClientListener listener) - { - // No need to check validity; removing a listener is always ok. - if (listener != null && destroyedListeners != null) - destroyedListeners.remove(listener); - } - - /** - * - * Adds a subscription to the subscription set for this MessageClient. - * - * @param selector The selector expression used for the subscription. - * @param subtopic The subtopic used for the subscription. - * @param maxFrequency The maximum number of messages the client wants to - * receive per second (0 disables this limit). - */ - public void addSubscription(String selector, String subtopic, int maxFrequency) - { - synchronized (lock) - { - checkValid(); - - incrementReferences(); - - // Create and add the subscription to the subscriptions set. - SubscriptionInfo si = new SubscriptionInfo(selector, subtopic, maxFrequency); - subscriptions.add(si); - - registerSubscriptionWithThrottleManager(si); - } - } - - /** - * - * Registers the subscription with the outbound queue processor's throttle - * manager, if one exists. - * - * @param si The subscription info object. - */ - public void registerSubscriptionWithThrottleManager(SubscriptionInfo si) - { - // Register the destination that will setup client level outbound throttling. - ThrottleSettings ts = destination.getNetworkSettings().getThrottleSettings(); - if (ts.getOutboundPolicy() != Policy.NONE && (ts.isOutboundClientThrottleEnabled() || si.maxFrequency > 0)) - { - // Setup the client level outbound throttling, and register the destination - // only if the policy is not NONE, and a throttling limit is specified - // either at the destination or by consumer. - OutboundQueueThrottleManager throttleManager = getThrottleManager(true); - if (throttleManager != null) - throttleManager.registerDestination(destinationId, ts.getOutgoingClientFrequency(), ts.getOutboundPolicy()); - } - else if (si.maxFrequency > 0) // Let the client know that maxFrequency will be ignored. - { - if (Log.isWarn()) - Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).warn("MessageClient with clientId '" - + clientId + "' for destination '" + destinationId - + "' specified a maxFrequency value of '" + si.maxFrequency - + "' but the destination does not define a throttling policy. This value will be ignored."); - } - - // Now, register the subscription. - OutboundQueueThrottleManager throttleManager = getThrottleManager(false); - if (throttleManager != null) - throttleManager.registerSubscription(destinationId, si); - } - - /** - * - * Removes a subscription from the subscription set for this MessageClient. - * - * @param selector The selector expression for the subscription. - * @param subtopic The subtopic for the subscription. - * @return true if no subscriptions remain for this MessageClient; otherwise false. - */ - public boolean removeSubscription(String selector, String subtopic) - { - synchronized (lock) - { - SubscriptionInfo si = new SubscriptionInfo(selector, subtopic); - if (subscriptions.remove(si)) - { - unregisterSubscriptionWithThrottleManager(si); - return decrementReferences(); - } - else if (Log.isError()) - { - Log.getLogger(MessageService.LOG_CATEGORY).error("Error - unable to find subscription to remove for MessageClient: " - + clientId + " selector: " + selector + " subtopic: " + subtopic); - } - return numReferences == 0; - } - } - - /** - * - * We use the same MessageClient for more than one subscription with different - * selection criteria. This tracks the number of subscriptions that are active - * so that we know when we are finished. - */ - public void incrementReferences() - { - synchronized (lock) - { - numReferences++; - } - } - - /** - * - * Decrements the numReferences variable and returns true if this was the last reference. - */ - public boolean decrementReferences() - { - synchronized (lock) - { - if (--numReferences == 0) - { - cancelTimeout(); - if (destination instanceof MessageDestination) - { - MessageDestination msgDestination = (MessageDestination)destination; - if (msgDestination.getThrottleManager() != null) - msgDestination.getThrottleManager().removeClientThrottleMark(clientId); - } - return true; - } - return false; - } - } - - /** - * - * Invoked by SubscriptionManager once the subscription state is setup completely - * for the MessageClient.. - */ - public void notifyCreatedListeners() - { - // Notify MessageClient created listeners. - if (!createdListeners.isEmpty()) - { - // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. - for (Iterator iter = createdListeners.iterator(); iter.hasNext();) - ((MessageClientListener)iter.next()).messageClientCreated(this); - } - } - - /** - * - * Invoked by SubscriptionManager while handling a subscribe request. - * If the request is updating an existing subscription the 'push' state in the associated FlexClient - * may need to be updated to ensure that the correct endpoint is used for this subscription. - * - * @param newEndpointId The id for the new endpoint that the subscription may have failed over to. - */ - public void resetEndpoint(String newEndpointId) - { - String oldEndpointId = null; - FlexSession oldSession = null; - FlexSession newSession = FlexContext.getFlexSession(); - synchronized (lock) - { - // If anything is null, or nothing has changed, no need for a reset. - if (endpointId == null || newEndpointId == null || flexSession == null || newSession == null || (endpointId.equals(newEndpointId) && flexSession.equals(newSession))) - return; - - oldEndpointId = endpointId; - endpointId = newEndpointId; - - oldSession = flexSession; - flexSession = newSession; - } - - // Unregister in order to reset the proper push settings in the re-registration below once the session association has been patched. - if (flexClient != null) - flexClient.unregisterMessageClient(this); - - // Clear out any reference to this subscription that the previously associated session has. - if (oldSession != null) - oldSession.unregisterMessageClient(this); - - // Associate the current session with this subscription. - if (flexSession != null) - flexSession.registerMessageClient(this); - - // Reset proper push settings. - if (flexClient != null) - flexClient.registerMessageClient(this); - - if (Log.isDebug()) - { - String msg = "MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been reset as a result of a resubscribe."; - if (oldEndpointId != null && !oldEndpointId.equals(newEndpointId)) - msg += " Endpoint change [" + oldEndpointId + " -> " + newEndpointId + "]"; - if ((oldSession != null) && (newSession != null) && (oldSession != newSession)) // Test identity. - msg += " FlexSession change [" + oldSession.getClass().getName() + ":" + oldSession.getId() + " -> " + newSession.getClass().getName() + ":" + newSession.getId() + "]"; - - Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug(msg); - } - } - - /** - * - * Used to test whether this client should receive this message - * based on the list of subscriptions we have recorded for it. - * It must match both the subtopic and the selector expression. - * Usually this is done by the subscription manager - this logic is - * only here to maintain api compatibility with one of the variants - * of the pushMessageToClients which has subscriberIds and an evalSelector - * property. - * - * @param message The message to test. - */ - public boolean testMessage(Message message, MessageDestination destination) - { - String subtopic = (String) message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME); - String subtopicSeparator = destination.getServerSettings().getSubtopicSeparator(); - synchronized (lock) - { - for (SubscriptionInfo si : subscriptions) - { - if (si.matches(message, subtopic, subtopicSeparator)) - return true; - } - } - return false; - } - - /** - * Returns true if the MessageClient is valid; false if it has been invalidated. - * - * @return true if the MessageClient is valid; otherwise false. - */ - public boolean isValid() - { - synchronized (lock) - { - return valid; - } - } - - /** - * Invalidates the MessageClient. - */ - public void invalidate() - { - invalidate(false /* don't attempt to notify the client */); - } - - /** - * Invalidates the MessageClient, and optionally attempts to notify the client that - * this subscription has been invalidated. - * This overload is used when a subscription is timed out while the client is still - * actively connected to the server but should also be used by any custom code on the server - * that invalidates MessageClients but wishes to notify the client cleanly. - * - * @param notifyClient <code>true</code> to notify the client that its subscription has been - * invalidated. - */ - public void invalidate(boolean notifyClient) - { - synchronized (lock) - { - if (!valid || invalidating) - return; // Already shutting down. - - invalidating = true; // This thread gets to shut the MessageClient down. - cancelTimeout(); - } - - // Record whether we're attempting to notify the client or not. - attemptingInvalidationClientNotification = notifyClient; - - // Build a subscription invalidation message and push to the client if it is still valid. - if (notifyClient && flexClient != null && flexClient.isValid()) - { - CommandMessage msg = new CommandMessage(); - msg.setDestination(destination.getId()); - msg.setClientId(clientId); - msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION); - Set subscriberIds = new TreeSet(); - subscriberIds.add(clientId); - try - { - if (destination instanceof MessageDestination) - { - MessageDestination msgDestination = (MessageDestination)destination; - ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */); - } - } - catch (MessageException ignore) {} - } - - // Notify messageClientDestroyed listeners that we're being invalidated. - if (destroyedListeners != null && !destroyedListeners.isEmpty()) - { - for (Iterator iter = destroyedListeners.iterator(); iter.hasNext();) - { - ((MessageClientListener)iter.next()).messageClientDestroyed(this); - } - destroyedListeners.clear(); - } - - // And generate unsubscribe messages for all of the MessageClient's subscriptions and - // route them to the destination this MessageClient is subscribed to. - // The reason we send a message to the service rather than just going straight to the SubscriptionManager - // is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of - // things with our SubscriptionManager. - ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>(); - synchronized (lock) - { - for (SubscriptionInfo subInfo : subscriptions) - { - CommandMessage unsubMessage = new CommandMessage(); - unsubMessage.setDestination(destination.getId()); - unsubMessage.setClientId(clientId); - unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION); - unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE); - unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector); - unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic); - unsubMessages.add(unsubMessage); - } - } - // Release the lock and send the unsub messages. - for (CommandMessage unsubMessage : unsubMessages) - { - try - { - destination.getService().serviceCommand(unsubMessage); - } - catch (MessageException me) - { - if (Log.isDebug()) - Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me)); - } - } - - synchronized (lock) - { - // If we didn't clean up all subscriptions log an error and continue with shutdown. - int remainingSubscriptionCount = subscriptions.size(); - if (remainingSubscriptionCount > 0 && Log.isError()) - Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation"); - } - - // If someone registered this message client, invalidating it will free - // their reference which will typically also remove this message client. - if (registered && destination instanceof MessageDestination) - ((MessageDestination)destination).getSubscriptionManager().releaseMessageClient(this); - - synchronized (lock) - { - valid = false; - invalidating = false; - } - - if (Log.isDebug()) - Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated."); - } - - /** - * Pushes the supplied message and then invalidates the MessageClient. - * - * @param message The message to push to the client before invalidating. - * When message is null, MessageClient is invalidated silently. - */ - public void invalidate(Message message) - { - if (message != null) - { - message.setDestination(destination.getId()); - message.setClientId(clientId); - - Set subscriberIds = new TreeSet(); - subscriberIds.add(clientId); - try - { - if (destination instanceof MessageDestination) - { - MessageDestination msgDestination = (MessageDestination)destination; - ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, message, false /* don't eval selector */); - } - } - catch (MessageException ignore) {} - - invalidate(true /* attempt to notify remote client */); - } - else - { - invalidate(); - } - } - - /** - * - * Compares this MessageClient to the specified object. The result is true if - * the argument is not null and is a MessageClient instance with a matching - * clientId value. - * - * @param o The object to compare this MessageClient to. - * @return true if the MessageClient is equal; otherwise false. - */ - public boolean equals(Object o) - { - if (o instanceof MessageClient) - { - MessageClient c = (MessageClient) o; - if (c != null && c.getClientId().equals(clientId)) - return true; - } - return false; - } - - /** - * - * Returns the hash code for this MessageClient. The returned value is - * the hash code for the MessageClient's clientId property. - * - * @return The hash code value for this MessageClient. - */ - @Override - public int hashCode() - { - return getClientId().hashCode(); - } - - /** - * - * The String representation of this MessageClient is returned (its clientId value). - * - * @return The clientId value for this MessageClient. - */ - @Override - public String toString() - { - return String.valueOf(clientId); - } - - //---------------------------------- - // TimeoutAbstractObject overrides - //---------------------------------- - - /** - * - * Implements TimeoutCapable. - * This method returns the timeout value configured for the MessageClient's destination. - */ - @Override - public long getTimeoutPeriod() - { - return (destination instanceof MessageDestination) ? - ((MessageDestination)destination).getSubscriptionManager().getSubscriptionTimeoutMillis() : 0; - } - - /** - * - * Implements TimeoutCapable. - * This method is invoked when the MessageClient has timed out and it - * invalidates the MessageClient. - */ - public void timeout() - { - invalidate(true /* notify client */); - } - - /** - * - * Returns true if a timeout task is running for this MessageClient. - */ - public boolean isTimingOut() - { - return willTimeout; - } - - /** - * - * Records whether a timeout task is running for this MessageClient. - */ - public void setTimingOut(boolean value) - { - willTimeout = value; - } - - //-------------------------------------------------------------------------- - // - // Private Methods - // - //-------------------------------------------------------------------------- - - /** - * Utility method that tests validity and throws an exception if the instance - * has been invalidated. - */ - private void checkValid() - { - synchronized (lock) - { - if (!valid) - { - throw new RuntimeException("MessageClient has been invalidated."); // TODO - localize - } - } - } - - private OutboundQueueThrottleManager getThrottleManager(boolean create) - { - if (flexClient != null) - { - FlexClientOutboundQueueProcessor processor = flexClient.getOutboundQueueProcessor(endpointId); - if (processor != null) - return create? processor.getOrCreateOutboundQueueThrottleManager() : processor.getOutboundQueueThrottleManager(); - } - return null; - } - - private void unregisterSubscriptionWithThrottleManager(SubscriptionInfo si) - { - OutboundQueueThrottleManager throttleManager = getThrottleManager(false); - if (throttleManager != null) - throttleManager.unregisterSubscription(destinationId, si); - } - - //-------------------------------------------------------------------------- - // - // Nested Classes - // - //-------------------------------------------------------------------------- - - /** - * Represents a MessageClient's subscription to a destination. - * It captures the optional selector expression and subtopic for the - * subscription. - */ - public static class SubscriptionInfo implements Comparable - { - public String selector, subtopic; - public int maxFrequency; // maxFrequency per subscription. Not used in BlazeDS. - - public SubscriptionInfo(String sel, String sub) - { - this(sel, sub, 0); - } - - public SubscriptionInfo(String sel, String sub, int maxFrequency) - { - this.selector = sel; - this.subtopic = sub; - this.maxFrequency = maxFrequency; - } - - @Override - public boolean equals(Object o) - { - if (o instanceof SubscriptionInfo) - { - SubscriptionInfo other = (SubscriptionInfo) o; - return equalStrings(other.selector, selector) && - equalStrings(other.subtopic, subtopic); - } - return false; - } - - @Override - public int hashCode() - { - return (selector == null ? 0 : selector.hashCode()) + - (subtopic == null ? 1 : subtopic.hashCode()); - } - - /** - * Compares the two subscription infos (being careful to - * ensure we compare in a consistent way if the arguments - * are switched). - * @param o the object to compare - * @return int the compare result - */ - public int compareTo(Object o) - { - SubscriptionInfo other = (SubscriptionInfo) o; - int result; - - if ((result = compareStrings(other.selector, selector)) != 0) - return result; - else if ((result = compareStrings(other.subtopic, subtopic)) != 0) - return result; - - return 0; - } - - /** - * Check whether the message matches with selected subtopic. - * @param message current message - * @param subtopicToMatch subtopc string - * @param subtopicSeparator suptopic separator - * @return true if the message matches the subtopic - */ - public boolean matches(Message message, String subtopicToMatch, String subtopicSeparator) - { - if ((subtopicToMatch == null && subtopic != null) || (subtopicToMatch != null && subtopic == null)) - return false; // If either defines a subtopic, they both must define one. - - // If both define a subtopic, they must match. - if (subtopicToMatch != null && subtopic != null) - { - Subtopic consumerSubtopic = new Subtopic(subtopic, subtopicSeparator); - Subtopic messageSubtopic = new Subtopic(subtopicToMatch, subtopicSeparator); - if (!consumerSubtopic.matches(messageSubtopic)) - return false; // Not a match. - } - - if (selector == null) - return true; - - JMSSelector jmsSelector = new JMSSelector(selector); - try - { - if (jmsSelector.match(message)) - return true; - } - catch (JMSSelectorException jmse) - { - // Log a warning for this client's selector and continue - if (Log.isWarn()) - { - Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " + - jmse.toString() + StringUtils.NEWLINE + - " incomingMessage: " + message + StringUtils.NEWLINE + - " selector: " + selector + StringUtils.NEWLINE); - } - } - return false; - } - - /** - * Returns a String representation of the subscription info. - * @return String the string representation of the subscription info - */ - public String toString() - { - StringBuffer sb = new StringBuffer(); - sb.append("Subtopic: " + subtopic + StringUtils.NEWLINE); - sb.append("Selector: " + selector + StringUtils.NEWLINE); - if (maxFrequency > 0) - sb.append("maxFrequency: " + maxFrequency); - return sb.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageClientListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageClientListener.java b/core/src/flex/messaging/MessageClientListener.java deleted file mode 100644 index 7cfdb90..0000000 --- a/core/src/flex/messaging/MessageClientListener.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -/** - * Interface to be notified when a MessageClient is created or destroyed. Implementations of this interface - * may add themselves as listeners statically via <code>MessageClient.addMessageClientCreatedListener()</code>. - * To listen for MessageClient destruction, the implementation class instance must add itself as a listener to - * a specific MessageClient instance via the <code>addMessageClientDestroyedListener()</code> method. - */ -public interface MessageClientListener -{ - /** - * Notification that a MessageClient was created. - * - * @param messageClient The MessageClient that was created. - */ - void messageClientCreated(MessageClient messageClient); - - /** - * Notification that a MessageClient is about to be destroyed. - * - * @param messageClient The MessageClient that will be destroyed. - */ - void messageClientDestroyed(MessageClient messageClient); -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageDestination.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageDestination.java b/core/src/flex/messaging/MessageDestination.java deleted file mode 100644 index 141ac8a..0000000 --- a/core/src/flex/messaging/MessageDestination.java +++ /dev/null @@ -1,491 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -import flex.management.runtime.messaging.MessageDestinationControl; -import flex.management.runtime.messaging.services.messaging.SubscriptionManagerControl; -import flex.management.runtime.messaging.services.messaging.ThrottleManagerControl; -import flex.messaging.config.ConfigurationConstants; -import flex.messaging.config.ConfigurationException; -import flex.messaging.config.DestinationSettings; -import flex.messaging.config.ThrottleSettings; -import flex.messaging.config.ConfigMap; -import flex.messaging.config.NetworkSettings; -import flex.messaging.config.ServerSettings; -import flex.messaging.config.ThrottleSettings.Policy; -import flex.messaging.log.LogCategories; -import flex.messaging.services.MessageService; -import flex.messaging.services.Service; -import flex.messaging.services.messaging.SubscriptionManager; -import flex.messaging.services.messaging.RemoteSubscriptionManager; -import flex.messaging.services.messaging.ThrottleManager; -import flex.messaging.services.messaging.MessagingConstants; -import flex.messaging.util.ClassUtil; - -/** - * A logical reference to a MessageDestination. - */ -public class MessageDestination extends FactoryDestination -{ - static final long serialVersionUID = -2016911808141319012L; - - /** Log category for <code>MessageDestination</code>.*/ - public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE; - - // Errors - private static final int UNSUPPORTED_POLICY = 10124; - - // Destination properties - private transient ServerSettings serverSettings; - - // Destination internal - private transient SubscriptionManager subscriptionManager; - private transient RemoteSubscriptionManager remoteSubscriptionManager; - private transient ThrottleManager throttleManager; - - private transient MessageDestinationControl controller; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>MessageDestination</code> instance. - */ - public MessageDestination() - { - this(false); - } - - /** - * Constructs a <code>MessageDestination</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>MessageDestination</code> - * is manageable; otherwise <code>false</code>. - */ - public MessageDestination(boolean enableManagement) - { - super(enableManagement); - - serverSettings = new ServerSettings(); - - // Managers - subscriptionManager = new SubscriptionManager(this); - remoteSubscriptionManager = new RemoteSubscriptionManager(this); - } - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Initializes the <code>MessageDestination</code> with the properties. - * If subclasses override, they must call <code>super.initialize()</code>. - * - * @param id The id of the destination. - * @param properties Properties for the <code>MessageDestination</code>. - */ - @Override - public void initialize(String id, ConfigMap properties) - { - super.initialize(id, properties); - - if (properties == null || properties.size() == 0) - return; - - // Network properties - network(properties); - - // Server properties - server(properties); - } - - /** - * Sets up the throttle manager before it starts. - */ - @Override - public void start() - { - // Create the throttle manager, only if needed. - if (networkSettings.getThrottleSettings() != null) - { - ThrottleSettings settings = networkSettings.getThrottleSettings(); - if (settings.isClientThrottleEnabled() || settings.isDestinationThrottleEnabled()) - { - settings.setDestinationName(getId()); - throttleManager = createThrottleManager(); - throttleManager.setThrottleSettings(settings); - throttleManager.start(); - } - } - super.start(); - } - - /** - * Stops the subscription, remote subscription, and throttle managers and - * then calls super class's stop. - */ - @Override - public void stop() - { - if (isStarted()) - { - subscriptionManager.stop(); - remoteSubscriptionManager.stop(); - if (throttleManager != null) - throttleManager.stop(); - } - super.stop(); - } - - //-------------------------------------------------------------------------- - // - // Public Getters and Setters for Destination properties - // - //-------------------------------------------------------------------------- - - /** - * Sets the <code>NetworkSettings</code> of the <code>MessageDestination</code>. - * - * @param networkSettings The <code>NetworkSettings</code> of the <code>MessageDestination</code> - */ - @Override - public void setNetworkSettings(NetworkSettings networkSettings) - { - super.setNetworkSettings(networkSettings); - - // Set the subscription manager settings if needed. - if (networkSettings.getSubscriptionTimeoutMinutes() > 0) - { - long subscriptionTimeoutMillis = networkSettings.getSubscriptionTimeoutMinutes() * 60L * 1000L; // Convert to millis. - subscriptionManager.setSubscriptionTimeoutMillis(subscriptionTimeoutMillis); - } - } - - /** - * Returns the <code>ServerSettings</code> of the <code>MessageDestination</code>. - * - * @return The <code>ServerSettings</code> of the <code>MessageDestination</code>. - */ - public ServerSettings getServerSettings() - { - return serverSettings; - } - - /** - * Sets the <code>ServerSettings</code> of the <code>MessageDestination</code>. - * - * @param serverSettings The <code>ServerSettings</code> of the <code>MessageDestination</code> - */ - public void setServerSettings(ServerSettings serverSettings) - { - this.serverSettings = serverSettings; - } - - /** - * Casts the <code>Service</code> into <code>MessageService</code> - * and calls super.setService. - * - * @param service The <code>Service</code> managing this <code>Destination</code>. - */ - @Override - public void setService(Service service) - { - MessageService messageService = (MessageService)service; - super.setService(messageService); - } - - //-------------------------------------------------------------------------- - // - // Other Public Methods - // - //-------------------------------------------------------------------------- - - /** - * - * Returns a <tt>ConfigMap</tt> of destination properties that the client - * needs. This includes properties from <code>super{@link #describeDestination(boolean)}</code> - * and it also includes outbound throttling policy that the edge server might need. - * - * @param onlyReliable Determines whether only reliable destination configuration should be returned. - * @return A <tt>ConfigMap</tt> of destination properties that the client needs. - */ - @Override - public ConfigMap describeDestination(boolean onlyReliable) - { - ConfigMap destinationConfig = super.describeDestination(onlyReliable); - if (destinationConfig == null) - return null; - - if (throttleManager == null) - return destinationConfig; - - Policy outboundPolicy = throttleManager.getOutboundPolicy(); - if (outboundPolicy == null || outboundPolicy == Policy.NONE) - return destinationConfig; - - // Add the outbound throttle policy to network properties section as appropriate. - ConfigMap properties = destinationConfig.getPropertyAsMap(ConfigurationConstants.PROPERTIES_ELEMENT, null); - if (properties == null) - { - properties = new ConfigMap(); - destinationConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties); - } - - ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null); - if (network == null) - { - network = new ConfigMap(); - properties.addProperty(NetworkSettings.NETWORK_ELEMENT, network); - } - - ConfigMap throttleOutbound = new ConfigMap(); - throttleOutbound.addProperty(ThrottleSettings.ELEMENT_POLICY, throttleManager.getOutboundPolicy().toString()); - network.addProperty(ThrottleSettings.ELEMENT_OUTBOUND, throttleOutbound); - - return destinationConfig; - } - - - public SubscriptionManager getSubscriptionManager() - { - return subscriptionManager; - } - - - public RemoteSubscriptionManager getRemoteSubscriptionManager() - { - return remoteSubscriptionManager; - } - - - public ThrottleManager getThrottleManager() - { - return throttleManager; - } - - - @Override - public boolean equals(Object o) - { - if (o instanceof Destination) - { - Destination d = (Destination)o; - String serviceType1 = d.getServiceType(); - String serviceType2 = getServiceType(); - if ((serviceType1 == null && serviceType2 == null) || (serviceType1 != null && serviceType1.equals(serviceType2))) - { - String id1 = d.getId(); - String id2 = getId(); - if ((id1 == null && id2 == null) || (id1 != null && id1.equals(id2))) - return true; - } - } - return false; - } - - - @Override - public int hashCode() - { - return (getServiceType() == null ? 0 : getServiceType().hashCode()) * 100003 + - (getId() == null ? 0 : getId().hashCode()); - } - - - @Override - public String toString() - { - return getServiceType() + "#" + getId(); - } - - //-------------------------------------------------------------------------- - // - // Protected/Private Methods - // - //-------------------------------------------------------------------------- - - protected ThrottleManager createThrottleManager() - { - Service service = getService(); - if (service == null || service.getMessageBroker() == null) - return new ThrottleManager(); // Return the default. - - try - { - Class<? extends ThrottleManager> throttleManagerClass = service.getMessageBroker().getThrottleManagerClass(); - Object instance = ClassUtil.createDefaultInstance(throttleManagerClass, null); - if (instance instanceof ThrottleManager) - return (ThrottleManager)instance; - } - catch (Throwable t) - { - // NOWARN - } - - return new ThrottleManager(); // Return the default. - } - - protected void network(ConfigMap properties) - { - ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null); - if (network == null) - return; - - // Get implementation specific network settings, including subclasses! - NetworkSettings ns = getNetworkSettings(); - - // Subscriber timeout; first check for subscription-timeout-minutes and fallback to legacy session-timeout. - int useLegacyPropertyToken = -999999; - int subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SUBSCRIPTION_TIMEOUT_MINUTES, useLegacyPropertyToken); - if (subscriptionTimeoutMinutes == useLegacyPropertyToken) - subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SESSION_TIMEOUT, NetworkSettings.DEFAULT_TIMEOUT); - ns.setSubscriptionTimeoutMinutes(subscriptionTimeoutMinutes); - - // Throttle Settings - ThrottleSettings ts = ns.getThrottleSettings(); - ts.setDestinationName(getId()); - throttle(ts, network); - - setNetworkSettings(ns); - } - - protected void throttle(ThrottleSettings ts, ConfigMap network) - { - ConfigMap inbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_INBOUND, null); - if (inbound != null) - { - ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(inbound); - ts.setInboundPolicy(policy); - int destFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0); - ts.setIncomingDestinationFrequency(destFreq); - int clientFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0); - ts.setIncomingClientFrequency(clientFreq); - } - - ConfigMap outbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_OUTBOUND, null); - if (outbound != null) - { - ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(outbound); - ts.setOutboundPolicy(policy); - int destFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0); - ts.setOutgoingDestinationFrequency(destFreq); - int clientFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0); - ts.setOutgoingClientFrequency(clientFreq); - } - } - - private ThrottleSettings.Policy getPolicyFromThrottleSettings(ConfigMap settings) - { - String policyString = settings.getPropertyAsString(ThrottleSettings.ELEMENT_POLICY, null); - ThrottleSettings.Policy policy = ThrottleSettings.Policy.NONE; - if (policyString == null) - return policy; - try - { - policy = ThrottleSettings.parsePolicy(policyString); - } - catch (ConfigurationException exception) - { - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(UNSUPPORTED_POLICY, new Object[] {getId(), policyString}); - throw ce; - } - return policy; - } - - protected void server(ConfigMap properties) - { - ConfigMap server = properties.getPropertyAsMap(DestinationSettings.SERVER_ELEMENT, null); - if (server == null) - return; - - long ttl = server.getPropertyAsLong(MessagingConstants.TIME_TO_LIVE_ELEMENT, -1); - serverSettings.setMessageTTL(ttl); - - boolean durable = server.getPropertyAsBoolean(MessagingConstants.IS_DURABLE_ELEMENT, false); - serverSettings.setDurable(durable); - - boolean allowSubtopics = server.getPropertyAsBoolean(MessagingConstants.ALLOW_SUBTOPICS_ELEMENT, false); - serverSettings.setAllowSubtopics(allowSubtopics); - - boolean disallowWildcardSubtopics = server.getPropertyAsBoolean(MessagingConstants.DISALLOW_WILDCARD_SUBTOPICS_ELEMENT, false); - serverSettings.setDisallowWildcardSubtopics(disallowWildcardSubtopics); - - int priority = server.getPropertyAsInt(MessagingConstants.MESSAGE_PRIORITY, -1); - if (priority != -1) - serverSettings.setPriority(priority); - - String subtopicSeparator = server.getPropertyAsString(MessagingConstants.SUBTOPIC_SEPARATOR_ELEMENT, MessagingConstants.DEFAULT_SUBTOPIC_SEPARATOR); - serverSettings.setSubtopicSeparator(subtopicSeparator); - - String routingMode = server.getPropertyAsString(MessagingConstants.CLUSTER_MESSAGE_ROUTING, "server-to-server"); - serverSettings.setBroadcastRoutingMode(routingMode); - } - - /** - * Returns the log category of the <code>MessageDestination</code>. - * - * @return The log category of the component. - */ - @Override - protected String getLogCategory() - { - return LOG_CATEGORY; - } - - /** - * Invoked automatically to allow the <code>MessageDestination</code> to setup its corresponding - * MBean control. - * - * @param service The <code>Service</code> that manages this <code>MessageDestination</code>. - */ - @Override - protected void setupDestinationControl(Service service) - { - controller = new MessageDestinationControl(this, service.getControl()); - controller.register(); - setControl(controller); - setupThrottleManagerControl(controller); - setupSubscriptionManagerControl(controller); - } - - protected void setupThrottleManagerControl(MessageDestinationControl destinationControl) - { - if (throttleManager != null) - { - ThrottleManagerControl throttleManagerControl = new ThrottleManagerControl(throttleManager, destinationControl); - throttleManagerControl.register(); - throttleManager.setControl(throttleManagerControl); - throttleManager.setManaged(true); - destinationControl.setThrottleManager(throttleManagerControl.getObjectName()); - } - } - - private void setupSubscriptionManagerControl(MessageDestinationControl destinationControl) - { - SubscriptionManagerControl subscriptionManagerControl = new SubscriptionManagerControl(getSubscriptionManager(), destinationControl); - subscriptionManagerControl.register(); - getSubscriptionManager().setControl(subscriptionManagerControl); - getSubscriptionManager().setManaged(true); - destinationControl.setSubscriptionManager(subscriptionManagerControl.getObjectName()); - } -}
