http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/HttpFlexSessionProvider.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/HttpFlexSessionProvider.java b/core/src/flex/messaging/HttpFlexSessionProvider.java deleted file mode 100644 index abdcb4c..0000000 --- a/core/src/flex/messaging/HttpFlexSessionProvider.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpSession; - -import flex.messaging.log.Log; - -/** - * Provider implementation for <code>HttpFlexSession</code>s. - * Not intended for public use. - */ -public class HttpFlexSessionProvider extends AbstractFlexSessionProvider -{ - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Factory method to get an existing <tt>HttpFlexSession</tt> for the current request, - * or create and return a new <code>HttpFlexSession</code> if necessary. - * The <code>HttpFlexSession</code> wraps the underlying J2EE <code>HttpSession</code>. - * Not intended for public use. - * - * @param request The current <tt>HttpServletRequest</tt>. - * @return A <tt>HttpFlexSession</tt>. - */ - public HttpFlexSession getOrCreateSession(HttpServletRequest request) - { - HttpFlexSession flexSession; - HttpSession httpSession = request.getSession(true); - - if (!HttpFlexSession.isHttpSessionListener && !HttpFlexSession.warnedNoEventRedispatch) - { - HttpFlexSession.warnedNoEventRedispatch = true; - if (Log.isWarn()) - Log.getLogger(HttpFlexSession.WARN_LOG_CATEGORY).warn("HttpFlexSession has not been registered as a listener in web.xml for this application so no events will be dispatched to FlexSessionAttributeListeners or FlexSessionBindingListeners. To correct this, register flex.messaging.HttpFlexSession as a listener in web.xml."); - } - - boolean isNew = false; - synchronized (httpSession) - { - flexSession = (HttpFlexSession)httpSession.getAttribute(HttpFlexSession.SESSION_ATTRIBUTE); - if (flexSession == null) - { - flexSession = new HttpFlexSession(this); - // Correlate this FlexSession to the HttpSession before triggering any listeners. - FlexContext.setThreadLocalSession(flexSession); - httpSession.setAttribute(HttpFlexSession.SESSION_ATTRIBUTE, flexSession); - flexSession.setHttpSession(httpSession); - isNew = true; - } - else - { - FlexContext.setThreadLocalSession(flexSession); - if (flexSession.httpSession == null) - { - // httpSession is null if the instance is new or is from - // serialization. - flexSession.setHttpSession(httpSession); - isNew = true; - } - } - } - - if (isNew) - { - getFlexSessionManager().registerFlexSession(flexSession); - flexSession.notifyCreated(); - - if (Log.isDebug()) - Log.getLogger(HttpFlexSession.FLEX_SESSION_LOG_CATEGORY).debug("FlexSession created with id '" + flexSession.getId() + "' for an Http-based client connection."); - } - - return flexSession; - } - }
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageBroker.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageBroker.java b/core/src/flex/messaging/MessageBroker.java deleted file mode 100644 index 80b061e..0000000 --- a/core/src/flex/messaging/MessageBroker.java +++ /dev/null @@ -1,2252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -import flex.management.ManageableComponent; -import flex.management.runtime.messaging.MessageBrokerControl; -import flex.management.runtime.messaging.log.LogManager; -import flex.messaging.client.FlexClient; -import flex.messaging.client.FlexClientManager; -import flex.messaging.cluster.ClusterManager; -import flex.messaging.config.ChannelSettings; -import flex.messaging.config.ConfigMap; -import flex.messaging.config.ConfigurationConstants; -import flex.messaging.config.ConfigurationException; -import flex.messaging.config.ConfigurationManager; -import flex.messaging.config.FlexClientSettings; -import flex.messaging.config.SecurityConstraint; -import flex.messaging.config.SecuritySettings; -import flex.messaging.config.SystemSettings; -import flex.messaging.endpoints.AbstractEndpoint; -import flex.messaging.endpoints.Endpoint; -import flex.messaging.endpoints.Endpoint2; -import flex.messaging.factories.JavaFactory; -import flex.messaging.io.BeanProxy; -import flex.messaging.io.PropertyProxyRegistry; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.messages.AbstractMessage; -import flex.messaging.messages.AcknowledgeMessage; -import flex.messaging.messages.AsyncMessage; -import flex.messaging.messages.CommandMessage; -import flex.messaging.messages.Message; -import flex.messaging.security.LoginManager; -import flex.messaging.security.SecurityException; -import flex.messaging.services.AbstractService; -import flex.messaging.services.Service; -import flex.messaging.services.ServiceException; -import flex.messaging.services.messaging.ThrottleManager; -import flex.messaging.util.Base64; -import flex.messaging.util.ClassUtil; -import flex.messaging.util.ExceptionUtil; -import flex.messaging.util.RedeployManager; -import flex.messaging.util.StringUtils; -import flex.messaging.util.UUIDGenerator; -import flex.messaging.util.UUIDUtils; -import flex.messaging.validators.DeserializationValidator; - -import javax.servlet.ServletContext; -import java.io.IOException; -import java.io.InputStream; -import java.io.UnsupportedEncodingException; -import java.security.Principal; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * The MessageBroker is the hub of message traffic. It has a number of endpoints which send and - * receive messages over the network, and it has a number of - * services that are message destinations. The broker routes - * decoded messages received by endpoints to services based - * on the service destination specified in each message. - * The broker also has a means of pushing messages back through - * endpoints to clients. - */ -public class MessageBroker extends ManageableComponent -{ - //-------------------------------------------------------------------------- - // - // Public Static Constants - // - //-------------------------------------------------------------------------- - - /** - * Id that the AuthenticationService uses to register itself with the broker. - */ - public static final String AUTHENTICATION_SERVICE_ID = "authentication-service"; - - /** - * Localized error messages for <code>MessageBroker</code>. - */ - public static final int ERR_MSG_NO_SERVICE_FOR_DEST = 10004; - public static final int ERR_MSG_DESTINATION_UNACCESSIBLE = 10005; - public static final int ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT = 10020; - public static final int ERR_MSG_NULL_MESSAGE_ID = 10029; - public static final int ERR_MSG_CANNOT_SERVICE_STOPPED = 10038; - public static final int ERR_MSG_NULL_ENDPOINT_URL = 10128; - public static final int ERR_MSG_SERVICE_CMD_NOT_SUPPORTED = 10451; - public static final int ERR_MSG_URI_ALREADY_REGISTERED = 11109; - - /** - * Log category for <code>MessageBroker</code>. - */ - public static final String LOG_CATEGORY = LogCategories.MESSAGE_GENERAL; - - /** - * Log category that captures startup information for broker's destinations. - */ - public static final String LOG_CATEGORY_STARTUP_SERVICE = LogCategories.STARTUP_SERVICE; - - - public static final String TYPE = "MessageBroker"; - - //-------------------------------------------------------------------------- - // - // Package Protected Static Constants - // - //-------------------------------------------------------------------------- - /** The default message broker id when one is not specified in web.xml. */ - public static final String DEFAULT_BROKER_ID = "__default__"; - - /** A map of currently available message brokers indexed by message broker id. */ - static final Map<String, MessageBroker> messageBrokers = new HashMap<String, MessageBroker>(); - - //-------------------------------------------------------------------------- - // - // Private Static Constants - // - //-------------------------------------------------------------------------- - - private static final String LOG_MANAGER_ID = "log"; - private static final Integer INTEGER_ONE = 1; - private static final String MESSAGEBROKER = "MessageBroker"; - private static final String ENDPOINT = "Endpoint"; - private static final String SERVICE = "Service"; - - //-------------------------------------------------------------------------- - // - // Constructors - // - //-------------------------------------------------------------------------- - - /** - * - * Create a MessageBroker. This constructor will - * establish collections for routers, endpoints, - * and services. - */ - public MessageBroker() - { - this(true, null); - } - - - public MessageBroker(boolean enableManagement) - { - this(enableManagement, null); - } - - - public MessageBroker(boolean enableManagement, String mbid) - { - this(enableManagement, mbid, MessageBroker.class.getClassLoader()); - } - - - public MessageBroker(boolean enableManagement, String mbid, ClassLoader loader) - { - super(enableManagement); - classLoader = loader; - attributes = new ConcurrentHashMap<String, Object>(); - destinationToService = new ConcurrentHashMap<String, String>(); - endpoints = new LinkedHashMap<String, Endpoint>(); - services = new LinkedHashMap<String, Service>(); - servers = new LinkedHashMap<String, Server>(); - factories = new HashMap<String, FlexFactory>(); - registeredEndpoints = new HashMap<String, String>(); - - // Add the built-in java factory - addFactory("java", new JavaFactory()); - - setId(mbid); - - log = Log.createLog(); - - clusterManager = new ClusterManager(this); - systemSettings = new SystemSettings(); - - if (isManaged()) - { - controller = new MessageBrokerControl(this); - controller.register(); - setControl(controller); - - logManager = new LogManager(); - logManager.setLog(log); - logManager.setParent(this); - logManager.setupLogControl(); - logManager.initialize(LOG_MANAGER_ID, null); - } - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - private Map<String, Object> attributes; - /** - * Map of attribute ids of Application or Session level scoped destination assemblers - * to the number of active destinations referring to. - */ - private final Map<String, Integer> attributeIdRefCounts = new HashMap<String, Integer>(); - private Map<String, ChannelSettings> channelSettings; - private ClassLoader classLoader; - private ClusterManager clusterManager; - private MessageBrokerControl controller; - private List<String> defaultChannels; - private DeserializationValidator deserializationValidator; - private Map<String, String> destinationToService; // destiantionId to serviceId map. - private Map<String, Endpoint> endpoints; - private boolean enforceEndpointValidation; - private Map<String, FlexFactory> factories; - private FlexClientManager flexClientManager; - private FlexClientSettings flexClientSettings; - private FlexSessionManager flexSessionManager; - private PathResolver externalPathResolver; - private InternalPathResolver internalPathResolver; - private Log log; - private LogManager logManager; - private LoginManager loginManager; - private RedeployManager redeployManager; - private Map<String, String> registeredEndpoints; - private SecuritySettings securitySettings; - private Map<String, Service> services; - private Map<String, Server> servers; - private final ConcurrentHashMap<String, ServiceValidationListener> serviceValidationListeners = new ConcurrentHashMap<String, ServiceValidationListener>(); - private ServletContext servletContext; - private SystemSettings systemSettings; - private Class<? extends ThrottleManager> throttleManagerClass = ThrottleManager.class; // The default ThrottleManager class. - private UUIDGenerator uuidGenerator; - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Sets the id of the <code>MessageBroker</code>. If id is null, uses the - * default broker id. - * - * - */ - @Override public void setId(String id) - { - if (id == null) - id = DEFAULT_BROKER_ID; - - super.setId(id); - } - - /** - * Retrieves a message broker with the supplied id. This is defined via - * the servlet init parameter messageBrokerId. If no messageBrokerId is supplied, pass - * in a null value for the id parameter. - * - * In case null is passed, the method tries to lookup the broker from current FlexContext. - * If not available, it uses default ID to lookup the message broker. - * - * @param id The id of the message broker to retrieve. - * @return The <code>MessageBroker</code> for the supplied id. - */ - public static MessageBroker getMessageBroker(String id) - { - if (id == null) - { - // If available, return the broker from FlexContext - MessageBroker broker = FlexContext.getMessageBroker(); - if (broker != null) - { - return broker; - } - - id = DEFAULT_BROKER_ID; - } - - return messageBrokers.get(id); - } - - /** - * Start the message broker's endpoints and services. - * - */ - @Override public void start() - { - if (isStarted()) - return; - - /* - * J2EE can be a real pain in terms of getting the right class loader so dump out - * some detailed info about what is going on. - */ - if (Log.isDebug()) - { - StringBuffer sb = new StringBuffer(100); - if (classLoader == MessageBroker.class.getClassLoader()) - sb.append(" the MessageBroker's class loader"); - if (classLoader == Thread.currentThread().getContextClassLoader()) - { - if (sb.length() > 0) sb.append(" and"); - sb.append(" the context class loader"); - } - if (sb.length() == 0) - sb.append(" not the context or the message broker's class loader"); - Log.getLogger(LogCategories.CONFIGURATION).debug( - "MessageBroker id: " + getId() + " classLoader is:" + - sb.toString() + " (" + "classLoader " + ClassUtil.classLoaderToString(classLoader)); - } - - // Catch any startup errors and log using our log machinery, then rethrow to trigger shutdown. - try - { - // MessageBroker doesn't call super.start() because it doesn't need the - // usual validation that other components need - setStarted(true); - - registerMessageBroker(); - if (flexClientManager == null) - { - flexClientManager = new FlexClientManager(isManaged(), this); - } - flexClientManager.start(); - flexSessionManager = new FlexSessionManager(isManaged(), this); - flexSessionManager.start(); - if (systemSettings == null) - { - systemSettings = new SystemSettings(); - } - startServices(); - loginManager.start(); - startEndpoints(); - startServers(); - redeployManager.start(); - } - catch (Exception e) - { - if (Log.isError()) - Log.getLogger(LogCategories.CONFIGURATION).error("MessageBroker failed to start: " + ExceptionUtil.exceptionFollowedByRootCausesToString(e)); - - // Rethrow. - throw new RuntimeException(e.getMessage(), e); - } - } - - /** - * Stop the broker's endpoints, clusters, and services. - * - */ - @Override public void stop() - { - if (!isStarted()) - return; - - if (Log.isDebug()) - Log.getLogger(LogCategories.CONFIGURATION).debug("MessageBroker stopping: " + getId()); - - serviceValidationListeners.clear(); - - flexSessionManager.stop(); - flexClientManager.stop(); - stopServers(); - stopEndpoints(); - - // set this MB in FlexContext as it is needed for reference counts in destination stopping - FlexContext.setThreadLocalMessageBroker(this); - stopServices(); - FlexContext.setThreadLocalMessageBroker(null); - - if (loginManager != null) - loginManager.stop(); - try - { - if (redeployManager != null) - redeployManager.stop(); - } - catch (Throwable t) - { - t.printStackTrace(); - } - clusterManager.destroyClusters(); - - super.stop(); - unRegisterMessageBroker(); - - // clear static proxy caches - BeanProxy.clear(); - PropertyProxyRegistry.release(); - - // clear system settings - systemSettings.clear(); - systemSettings = null; - - if (Log.isDebug()) - Log.getLogger(LogCategories.CONFIGURATION).debug("MessageBroker stopped: " + getId()); - } - - /** - * Returns an <tt>Iterator</tt> containing the current names that attributes have been bound - * to the <tt>MessageBroker</tt> under. - * Use {@link #getAttribute(String)} to retrieve an attribute value. - * - * @return An iterator containing the current names of the attributes. - */ - public Iterator<String> getAttributeNames() - { - return attributes.keySet().iterator(); - } - - /** - * Returns the attribute value bound to the <tt>MessageBroker</tt> under the provided name. - * - * @param name The attribute name. - * @return Object the attribute object - */ - public Object getAttribute(String name) - { - return attributes.get(name); - } - - /** - * Binds an attribute value to the <tt>MessageBroker</tt> under the provided name. - * - * @param name The attribute name. - * @param value The attribute value. - */ - public void setAttribute(String name, Object value) - { - if (value == null) - removeAttribute(name); - else - attributes.put(name, value); - } - - /** - * Removes the attribute with the given name from the <tt>MessageBroker</tt>. - * - * @param name The attribute name. - */ - public void removeAttribute(String name) - { - attributes.remove(name); - } - - /** - * Returns the deserialization validator of the <tt>MessageBroker</tt> or null - * if none exists. - * - * @return The deserialization validator of the <tt>MessageBroker</tt> or null - * if none exists. - */ - public DeserializationValidator getDeserializationValidator() - { - return deserializationValidator; - } - - /** - * Sets the deserialization validator of the <tt>MessageBroker</tt>. - * - * @param deserializationValidator The deserialization validator. - */ - public void setDeserializationValidator(DeserializationValidator deserializationValidator) - { - this.deserializationValidator = deserializationValidator; - } - - public void setExternalPathResolver(PathResolver externalPathResolver) - { - this.externalPathResolver = externalPathResolver; - } - - - public void setInternalPathResolver(InternalPathResolver internalPathResolver) - { - this.internalPathResolver = internalPathResolver; - } - - - public InputStream resolveExternalPath(String filename) throws IOException - { - return externalPathResolver.resolve(filename); - } - - - public InputStream resolveInternalPath(String filename) throws IOException - { - return internalPathResolver.resolve(filename); - } - - - public interface PathResolver - { - InputStream resolve(String filename) throws IOException; - } - - /** - * This interface is being kept for backwards compatibility. - * - */ - public interface InternalPathResolver extends PathResolver - { - // No-op. - } - - - public ClusterManager getClusterManager() - { - return clusterManager; - } - - /** - * - * Add a <code>Server</code> to the broker's collection. - * - * @param server <code>Server</code> to be added. - */ - public void addServer(Server server) - { - if (server == null) - { - // Cannot add null ''{0}'' to the ''{1}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{"Server", MESSAGEBROKER}); - throw ex; - } - - String id = server.getId(); - - if (id == null) - { - // Cannot add ''{0}'' with null id to the ''{1}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{"Server", MESSAGEBROKER}); - throw ex; - } - - // No need to add if server is already added - Server currentServer = getServer(id); - if (currentServer == server) - return; - - // Do not allow servers with the same id - if (currentServer != null) - { - // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{"Server", id, MESSAGEBROKER}); - throw ex; - } - - servers.put(id, server); - } - - /** - * - * Returns the <code>Server</code> with the specified id. - * - * @param id The id of the <code>Server</code>/ - * @return The <code>Server</code> with the specified id or null if no - * <code>Server</code> with the id exists. - */ - public Server getServer(String id) - { - return servers.get(id); - } - - /** - * - * Stops and removes the <code>Server</code> from the set of shared servers managed by the <code>MessageBroker</code>. - * - * @param id The id of the <code>Server</code> to remove. - * @return <code>Server</code> that has been removed or <code>null</code> if it doesn't exist. - */ - public Server removeServer(String id) - { - Server server = servers.get(id); - if (server != null) - { - server.stop(); - servers.remove(id); - } - return server; - } - - /** - * - * Creates an <code>Endpoint</code> instance, sets its id and url. - * It further sets the endpoint manageable if the <code>MessageBroker</code> - * is manageable, and assigns its <code>MessageBroker</code> to the - * <code>MessageBroker</code> that created it. - * - * @param id The id of the endpoint. - * @param url The url of the endpoint. - * @param className The class name of the endpoint. - * - * @return The created <code>Endpoint</code> instance. - */ - public Endpoint createEndpoint(String id, String url, String className) - { - Class endpointClass = ClassUtil.createClass(className, getClassLoader()); - - Endpoint endpoint = (Endpoint)ClassUtil.createDefaultInstance(endpointClass, Endpoint.class); - endpoint.setId(id); - endpoint.setUrl(url); - endpoint.setManaged(isManaged()); - endpoint.setMessageBroker(this); - - return endpoint; - } - - /** - * - * Add an endpoint to the broker's collection. Broker will accept the endpoint - * to be added only if the endpoint is not null, it does not have null id or - * url, and it does not have the same id or url as another endpoint. - * - * @param endpoint Endpoint to be added. - */ - public void addEndpoint(Endpoint endpoint) - { - if (endpoint == null) - { - // Cannot add null ''{0}'' to the ''{1}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{ENDPOINT, MESSAGEBROKER}); - throw ex; - } - - String id = endpoint.getId(); - - if (id == null) - { - // Cannot add ''{0}'' with null id to the ''{1}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{ENDPOINT, MESSAGEBROKER}); - throw ex; - } - - // No need to add if endpoint is already added - if (getEndpoint(id) == endpoint) - return; - - // Do not allow endpoints with the same id - if (getEndpoint(id) != null) - { - // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{ENDPOINT, id, MESSAGEBROKER}); - throw ex; - } - - // Add the endpoint only if its url is not null and it is not registered - // by another channel - checkEndpointUrl(id, endpoint.getUrl()); - - // Finally add the endpoint to endpoints map - endpoints.put(id, endpoint); - } - - /** - * - * Returns the <code>Endpoint</code> with the specified id. - * - * @param id The id of the <code>Endpoint</code>/ - * @return The <code>Endpoint</code> with the specified id or null if no <code>Endpoint</code> with the id exists. - */ - public Endpoint getEndpoint(String id) - { - return endpoints.get(id); - } - - /** - * - * Retrieve the map of all endpoints in this broker. - * - * @return the map of all endpoints in this broker - */ - public Map<String, Endpoint> getEndpoints() - { - return endpoints; - } - - /** - * - * Retrieve an endpoint based on a requested URL path. Two endpoints should not be - * registered to the same path. - * - * @param path the URL path - * @param contextPath the context path - * @return endpoint based on a requested URL path - */ - public Endpoint getEndpoint(String path, String contextPath) - { - for (String id : endpoints.keySet()) - { - Endpoint e = endpoints.get(id); - - if (matchEndpoint(path, contextPath, e)) - { - return e; - } - } - MessageException lme = new MessageException(); - lme.setMessage(10003, new Object[] {path}); - throw lme; - } - - /** - * - * Removes an endpoint from the <code>MessageBroker</code>. - * - * @param id The id of the endpoint. - * @return The removed endpoint. - */ - public Endpoint removeEndpoint(String id) - { - Endpoint endpoint = getEndpoint(id); - if (endpoint != null) - { - endpoint.stop(); - endpoints.remove(id); - } - return endpoint; - } - - /** - * Returns whether the endpoint validation is enforced on the server, regardless - * of whether client requested endpoint validation or not. - * - * @return True if the endpoint validation is enforced on the server, regardless - * of whether client requested endpoint validation or not. - */ - public boolean isEnforceEndpointValidation() - { - return enforceEndpointValidation; - } - - /** - * Sets whether the endpoint validation is enforced on the server, regardless - * of whether client requested endpoint validation or not. - * - * @param enforceEndpointValidation The endpoint validation flag. - */ - public void setEnforceEndpointValidation(boolean enforceEndpointValidation) - { - this.enforceEndpointValidation = enforceEndpointValidation; - } - - /** - * Returns the <code>FlexFactory</code> with the specified id. - * - * @param id The id of the <code>FlexFactory</code>. - * @return The <code>FlexFactory</code> with the specified id or null if no - * factory with the id exists. - */ - public FlexFactory getFactory(String id) - { - return factories.get(id); - } - - /** - * Returns the map of <code>FlexFactory</code> instances. - * - * @return The map of <code>FlexFactory</code> instances. - */ - public Map<String, FlexFactory> getFactories() - { - return factories; - } - - /** - * Registers a factory with the <code>MessageBroker</code>. - * - * @param id The id of the factory. - * @param factory <code>FlexFactory</code> instance. - */ - public void addFactory(String id, FlexFactory factory) - { - if (id == null) - { - // Cannot add ''{0}'' with null id to the ''{1}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{"FlexFactory", MESSAGEBROKER}); - throw ex; - } - // No need to add if factory is already added - if (getFactory(id) == factory) - { - return; - } - // Do not allow multiple factories with the same id - if (getFactory(id) != null) - { - // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{"FlexFactory", id, MESSAGEBROKER}); - throw ex; - } - factories.put(id, factory); - } - - /** - * Removes the <code>FlexFactory</code> from the list of factories known - * by the <code>MessageBroker</code>. - * - * @param id The id of the <code>FlexFactory</code>. - * @return <code>FlexFactory</code> that has been removed. - */ - public FlexFactory removeFactory(String id) - { - FlexFactory factory = getFactory(id); - if (factory != null) - { - factories.remove(id); - } - return factory; - } - - /** - * Returns the <code>Service</code> with the specified id. - * - * @param id The id of the <code>Service</code>/ - * @return The <code>Service</code> with the specified id or null if no - * <code>Service</code> with the id exists. - */ - public Service getService(String id) - { - return services.get(id); - } - - /** - * Return a service of the specific type. - * The current list of services is searched for the specified class name. - * If there is no service of the specified type, null is returned. - * If there is more than one service of the same type it is undefined which instance is returned. - * If more than one service of a specific type is configured, - * callers should use {@link #getService(String)} and provide the service id of the specific service, - * or {@link #getServices()} to get access to the map of all registered services. - * - * @param type the fully qualified class name of the service implementation. - * @return a service or null if not found. - */ - public Service getServiceByType(String type) - { - for (Service svc : services.values()) - { - if (svc.getClass().getName().equals(type)) - { - return svc; - } - } - return null; - } - - /** - * Returns the Map of <code>Service</code> instances. - * - * @return The Map of <code>Service</code> instances. - */ - public Map<String, Service> getServices() - { - return services; - } - - /** - * Returns a <tt>ConfigMap</tt> of service and channel properties that the client - * needs. - * - * @param endpoint Endpoint used to filter the destinations of the service; - * no filtering is done if the endpoint is null. - * @return ConfigMap of server properties. - */ - public ConfigMap describeServices(Endpoint endpoint) - { - return describeServices(endpoint, true); - } - - /** - * - * Returns a <tt>ConfigMap</tt> of service and channel properties that the client - * needs. - * The <tt>allDestinations</tt> flag controls whether configuration for all - * destinations or only reliable client destinations is returned. - * - * @param endpoint Endpoint used to filter the destinations of the service. - * No filtering is done if the endpoint is null. - * @param onlyReliable When false, configuration for all destinations is - * returned instead of only reliable destinations. - * @return ConfigMap of service properties. - */ - public ConfigMap describeServices(Endpoint endpoint, boolean onlyReliable) - { - // Let the service validation listeners know about the configuration change. - if (!serviceValidationListeners.isEmpty()) - { - for (Enumeration<ServiceValidationListener> iter = serviceValidationListeners.elements(); iter.hasMoreElements();) - iter.nextElement().validateServices(); - } - - ConfigMap servicesConfig = new ConfigMap(); - - // Keep track of channel ids as we encounter them so we can generate - // the channel properties that might be needed by the client. - ArrayList<String> channelIds = new ArrayList<String>(); - if (endpoint == null) - { - for (Endpoint endpointToAdd: getEndpoints().values()) - channelIds.add(endpointToAdd.getId()); - } - else - { - channelIds.add(endpoint.getId()); - } - - if (defaultChannels != null) - { - ConfigMap defaultChannelsMap = new ConfigMap(); - for (Object defaultChannel : defaultChannels) - { - String id = (String) defaultChannel; - ConfigMap channelConfig = new ConfigMap(); - channelConfig.addProperty(ConfigurationConstants.REF_ATTR, id); - defaultChannelsMap.addProperty(ConfigurationConstants.CHANNEL_ELEMENT, channelConfig); - if (!channelIds.contains(id)) - channelIds.add(id); - } - if (defaultChannelsMap.size() > 0) - servicesConfig.addProperty(ConfigurationConstants.DEFAULT_CHANNELS_ELEMENT, defaultChannelsMap); - } - - for (Service service : services.values()) - { - ConfigMap serviceConfig = service instanceof AbstractService? - ((AbstractService)service).describeService(endpoint, onlyReliable) : service.describeService(endpoint); - if (serviceConfig != null && serviceConfig.size() > 0) - servicesConfig.addProperty(ConfigurationConstants.SERVICE_ELEMENT, serviceConfig); - } - - // Need to send channel properties again in case the client didn't - // compile in services-config.xml and hence doesn't have channels section - // of the configuration - but only if channel/endpoint is not tagged as "remote"! - ConfigMap channels = new ConfigMap(); - for (String id : channelIds) - { - Endpoint currentEndpoint = getEndpoint(id); - if (currentEndpoint instanceof AbstractEndpoint && ((AbstractEndpoint)currentEndpoint).isRemote()) - { - continue; // Client already has configuration for "remote" endpoint by other means. - } - - ConfigMap channel = currentEndpoint.describeEndpoint(); - if (channel != null && channel.size() > 0) - channels.addProperty(ConfigurationConstants.CHANNEL_ELEMENT, channel); - } - if (channels.size() > 0) - servicesConfig.addProperty(ConfigurationConstants.CHANNELS_ELEMENT, channels); - - if (Log.isDebug()) - Log.getLogger(ConfigurationManager.LOG_CATEGORY).debug( - "Returning service description for endpoint: " + - (endpoint == null? "all" : endpoint.getId()) + " config: " + servicesConfig); - - return servicesConfig; - } - - /** - * Add a listener for the describeServices callback. The describeServices listener - * is called before any execution of the describeServices method. - * - * @param id Identifier of the listener to add - * @param listener The listener callback - */ - public void addServiceValidationListener(String id, ServiceValidationListener listener) - { - if (listener != null) - { - serviceValidationListeners.putIfAbsent(id, listener); - } - } - - /** - * Returns an <tt>Iterator</tt> for all <tt>ServiceValidationListeners</tt> currently - * registered with the broker. - * - * @return An <tt>Iterator</tt> for all registered <tt>ServiceValidationListeners</tt>. - */ - public Iterator<ServiceValidationListener> getServiceValidationListenerIterator() - { - return serviceValidationListeners.values().iterator(); - } - - /** - * Remove a listener from the describeServices callback. - * - * @param id Identifier of the listener to remove - */ - public void removeServiceValidationListener(String id) - { - if (serviceValidationListeners.containsKey(id)) - { - serviceValidationListeners.remove(id); - } - } - - /** - * Creates a <code>Service</code> instance, sets its id, sets it manageable - * if the <code>MessageBroker</code> that created it is manageable, - * and sets its <code>MessageBroker</code> to the <code>MessageBroker</code> that - * created it. - * - * @param id The id of the <code>Service</code>. - * @param className The class name of the <code>Service</code>. - * - * @return The <code>Service</code> instanced created. - */ - public Service createService(String id, String className) - { - Class svcClass = ClassUtil.createClass(className, getClassLoader()); - - Service service = (Service)ClassUtil.createDefaultInstance(svcClass, Service.class); - service.setId(id); - service.setManaged(isManaged()); - service.setMessageBroker(this); - - return service; - } - - /** - * Add a message type -to- service mapping to the broker's collection. - * When the broker attempts to route a message to a service, it finds the first - * service capable of handling the message type. - * - * Note that <code>Service</code> cannot be null, it cannot have a null - * id, and it cannot have the same id or type of a <code>Service</code> - * already registered with the <code>MessageBroker</code>. - * - * <code>Service</code> needs to be started if the <code>MessageBroker</code> - * is already running. - * - * @param service The service instance used to handle the messages - * - */ - public void addService(Service service) - { - if (service == null) - { - // Cannot add null ''{0}'' to the ''{1}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{SERVICE, MESSAGEBROKER}); - throw ex; - } - - String id = service.getId(); - - if (id == null) - { - // Cannot add ''{0}'' with null id to the ''{1}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{SERVICE, MESSAGEBROKER}); - throw ex; - } - // No need to add if service is already added - if (getService(id) == service) - { - return; - } - // Do not allow multiple services with the same id - if (getService(id) != null) - { - // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{SERVICE, id, MESSAGEBROKER}); - throw ex; - } - // Not supposed to have multiple services of the same type; warn about it - // but still allow it. - String type = service.getClass().getName(); - if (getServiceByType(type) != null && Log.isWarn()) - Log.getLogger(LOG_CATEGORY).warn("Adding a service type '{0}' that is already registered with the MessageBroker", - new Object[]{type}); - - - services.put(id, service); - - if (service.getMessageBroker() == null || service.getMessageBroker() != this) - { - service.setMessageBroker(this); - } - } - - /** - * Removes the <code>Service</code> from the list of services known - * by the <code>MessageBroker</code>. - * - * @param id The id of the <code>Service</code>. - * @return Previous <code>Service</code> associated with the id. - */ - public Service removeService(String id) - { - Service service = getService(id); - if (service != null) - { - service.stop(); - services.remove(id); - } - return service; - } - - /** - * Returns the logger of the <code>MessageBroker</code>. - * - * @return Logger of the <code>MessageBroker</code>. - */ - public Log getLog() - { - return log; - } - - - public LogManager getLogManager() - { - return logManager; - } - - - public LoginManager getLoginManager() - { - return loginManager; - } - - - public void setLoginManager(LoginManager loginManager) - { - if (this.loginManager != null && this.loginManager.isStarted()) - this.loginManager.stop(); - - this.loginManager = loginManager; - - if (isStarted()) - loginManager.start(); - } - - - public FlexClientManager getFlexClientManager() - { - return flexClientManager; - } - - - public void setFlexClientManager(FlexClientManager value) - { - flexClientManager = value; - } - - - public FlexSessionManager getFlexSessionManager() - { - return flexSessionManager; - } - - - public void setFlexSessionManager(FlexSessionManager value) - { - flexSessionManager = value; - } - - - public RedeployManager getRedeployManager() - { - return redeployManager; - } - - - public void setRedeployManager(RedeployManager redeployManager) - { - if (this.redeployManager != null && this.redeployManager.isStarted()) - this.redeployManager.stop(); - - this.redeployManager = redeployManager; - - if (isStarted()) - redeployManager.start(); - } - - - public Class<? extends ThrottleManager> getThrottleManagerClass() - { - return throttleManagerClass; - } - - - public void setThrottleManagerClass(Class<? extends ThrottleManager> throttleManagerClass) - { - this.throttleManagerClass = throttleManagerClass; - } - - /** - * Returns a UUID either from the UUID generator assigned to <tt>MessageBroker</tt>, - * or from the <tt>UUIDUtils#createUUID</tt> if there is no assigned UUID generator. - * - * @return String the UUID. - */ - public String createUUID() - { - return uuidGenerator != null? uuidGenerator.createUUID() : UUIDUtils.createUUID(); - } - - /** - * Returns the custom <tt>UUIDGenerator</tt> used by the <tt>MessageBroker</tt> - * for NIO-HTTP session cookie value and <tt>FlexClient</tt> id generation or null if - * the default UUID generator, <tt>UUIDUtils</tt>, is being used. - * - * @return The custom <tt>UUIDGenerator</tt> used by <tt>MessageBroker</tt> or null. - */ - public UUIDGenerator getUUIDGenerator() - { - return uuidGenerator; - } - - - /** - * Sets the custom <tt>UUIDGenerator</tt> used by the <tt>MessageBroker</tt> - * for NIO-HTTP session cookie value and <tt>FlexClient</tt> id generation. - * - * @param value The custom <tt>UUIDGenerator</tt>. - */ - public void setUUIDGenerator(UUIDGenerator value) - { - uuidGenerator = value; - } - - /** - * Returns the list of channel ids known to the <code>MessageBroker</code>. - * - * @return The list of channel ids. - */ - public List<String> getChannelIds() - { - return (endpoints != null && endpoints.size() != 0)? new ArrayList<String>(endpoints.keySet()) : null; - } - - - public ChannelSettings getChannelSettings(String ref) - { - return channelSettings.get(ref); - } - - - public Map<String, ChannelSettings> getAllChannelSettings() - { - return channelSettings; - } - - - public void setChannelSettings(Map<String, ChannelSettings> channelSettings) - { - this.channelSettings = channelSettings; - } - - /** - * Returns the default channel ids of the MessageBroker. If a service - * specifies its own list of channels it overrides these defaults. - * - * @return Default channel ids of the MessageBroker. - */ - public List<String> getDefaultChannels() - { - return defaultChannels; - } - - /** - * Adds the channel id to the list of default channel ids. - * - * @param id The id of the channel to add to the list of default channel ids. - */ - public void addDefaultChannel(String id) - { - if (defaultChannels == null) - defaultChannels = new ArrayList<String>(); - else if (defaultChannels.contains(id)) - return; - - List<String> channelIds = getChannelIds(); - if (channelIds == null || !channelIds.contains(id)) - { - // No channel with id ''{0}'' is known by the MessageBroker. - if (Log.isWarn()) - { - Log.getLogger(LOG_CATEGORY).warn("No channel with id '{0}' is known by the MessageBroker." + - " Not adding the channel.", - new Object[]{id}); - } - return; - } - defaultChannels.add(id); - } - - /** - * Sets the default channel ids of the MessageBroker. - * - * @param ids Default channel ids of the MessageBroker. - */ - public void setDefaultChannels(List<String> ids) - { - if (ids != null) - { - List<String> channelIds = getChannelIds(); - for (Iterator<String> iter = ids.iterator(); iter.hasNext();) - { - String id = iter.next(); - if (channelIds == null || !channelIds.contains(id)) - { - iter.remove(); - if (Log.isWarn()) - { - Log.getLogger(LOG_CATEGORY).warn("No channel with id '{0}' is known by the MessageBroker." + - " Not adding the channel.", - new Object[]{id}); - } - } - } - } - defaultChannels = ids; - } - - /** - * Removes the channel id from the list of default channel ids. - * - * @param id The id of the channel to remove from the list of default channel ids. - * @return <code>true</code> if the list contained the channel id. - */ - public boolean removeDefaultChannel(String id) - { - return defaultChannels != null && defaultChannels.remove(id); - } - - /** - * Returns the <code>SecurityConstraint</code> with the indicated - * reference id. - * - * @param ref The reference of the <code>SecurityConstraint</code> - * @return The <code>SecurityConstraint</code> with the indicated reference id. - */ - public SecurityConstraint getSecurityConstraint(String ref) - { - return getSecuritySettings().getConstraint(ref); - } - - - public ServletContext getServletContext() - { - return servletContext; - } - - - public SecuritySettings getSecuritySettings() - { - return securitySettings; - } - - - public void setSecuritySettings(SecuritySettings securitySettings) - { - this.securitySettings = securitySettings; - } - - - public SystemSettings getSystemSettings() - { - return systemSettings; - } - - - public void setSystemSettings(SystemSettings l) - { - systemSettings = l; - } - - - public FlexClientSettings getFlexClientSettings() - { - return flexClientSettings; - } - - - public void setFlexClientSettings(FlexClientSettings value) - { - flexClientSettings = value; - } - - - public void initThreadLocals() - { - // No thread-locals anymore, so no-op. - } - - /** - * You can call this method in order to send a message from your code into - * the message routing system. The message is routed to a service that - * is defined to handle messages of this type. Once the service is identified, - * the destination property of the message is used to find a destination - * configured for that service. The adapter defined for that destination - * is used to handle the message. - * - * @param message The message to be routed to a service - * @param endpoint This can identify the endpoint that is sending the message - * but it is currently not used so you may pass in null. - * @return <code>AcknowledgeMessage</code> with result. - */ - public AcknowledgeMessage routeMessageToService(Message message, Endpoint endpoint) - { - // Make sure message has a messageId - checkMessageId(message); - - Object serviceResult = null; - boolean serviced = false; - Service service = null; - String destId = message.getDestination(); - try - { - String serviceId = destId != null ? destinationToService.get(destId) : null; - - if ((serviceId == null) && (destId != null) && (!serviceValidationListeners.isEmpty())) - { - for (Enumeration<ServiceValidationListener> iter = serviceValidationListeners.elements(); iter.hasMoreElements();) - { - iter.nextElement().validateDestination(destId); - } - serviceId = destinationToService.get(destId); - } - - if (serviceId != null) - { - service = services.get(serviceId); - serviced = true; - Destination destination = service.getDestination(destId); - inspectOperation(message, destination); - // Remove the validate endpoint header if it was set. - if (message.headerExists(Message.VALIDATE_ENDPOINT_HEADER)) - message.getHeaders().remove(Message.VALIDATE_ENDPOINT_HEADER); - - if (Log.isDebug()) - Log.getLogger(getLogCategory(message)).debug( - "Before invoke service: " + service.getId() + StringUtils.NEWLINE + - " incomingMessage: " + message + StringUtils.NEWLINE); - - extractRemoteCredentials(service, message); - serviceResult = service.serviceMessage(message); - } - - if (!serviced) - { - MessageException lme = new MessageException(); - // The supplied destination id is not registered with any service. - lme.setMessage(ERR_MSG_NO_SERVICE_FOR_DEST); - throw lme; - } - - if (Log.isDebug()) - { - String debugServiceResult = Log.getPrettyPrinter().prettify(serviceResult); - Log.getLogger(getLogCategory(message)).debug( - "After invoke service: " + service.getId() + StringUtils.NEWLINE + - " reply: " + debugServiceResult + StringUtils.NEWLINE); - } - - AcknowledgeMessage ack; - if (serviceResult instanceof AcknowledgeMessage) - { - // service will return an ack if they need to transform it in some - // service-specific way (paging is an example) - ack = (AcknowledgeMessage)serviceResult; - } - else - { - // most services will return a result of some sort, possibly null, - // and expect the broker to compose a message to deliver it - ack = new AcknowledgeMessage(); - ack.setBody(serviceResult); - } - ack.setCorrelationId(message.getMessageId()); - ack.setClientId(message.getClientId()); - return ack; - } - catch (MessageException exc) - { - exc.logAtHingePoint(message, - null, /* No outbound error message at this point. */ - "Exception when invoking service '" + (service == null ? "(none)" : service.getId()) + "': "); - - throw exc; - } - catch (RuntimeException exc) - { - Log.getLogger(LogCategories.MESSAGE_GENERAL).error( - "Exception when invoking service: " + - (service == null ? "(none)" : service.getId()) + - StringUtils.NEWLINE + - " with message: " + message + StringUtils.NEWLINE + - ExceptionUtil.exceptionFollowedByRootCausesToString(exc) + StringUtils.NEWLINE); - - throw exc; - } - catch (Error exc) - { - Log.getLogger(LogCategories.MESSAGE_GENERAL).error( - "Error when invoking service: " + - (service == null ? "(none)" : service.getId()) + - StringUtils.NEWLINE + - " with message: " + message + StringUtils.NEWLINE + - ExceptionUtil.exceptionFollowedByRootCausesToString(exc) + StringUtils.NEWLINE); - - throw exc; - } - - } - - - public AsyncMessage routeCommandToService(CommandMessage command, Endpoint endpoint) - { - // Make sure command has a messageId - checkMessageId(command); - - String destId = command.getDestination(); - - AsyncMessage replyMessage; - Service service; - String serviceId; - Object commandResult = null; - boolean serviced = false; - boolean recreateHttpFlexSessionAfterLogin = false; - - // Forward login and logout commands to AuthenticationService - int operation = command.getOperation(); - if (operation == CommandMessage.LOGIN_OPERATION || operation == CommandMessage.LOGOUT_OPERATION) - { - serviceId = AUTHENTICATION_SERVICE_ID; - recreateHttpFlexSessionAfterLogin = securitySettings.isRecreateHttpSessionAfterLogin() - && operation == CommandMessage.LOGIN_OPERATION && FlexContext.getFlexSession() instanceof HttpFlexSession; - } - else - { - serviceId = destId != null? destinationToService.get(destId) : null; - } - - service = serviceId != null? services.get(serviceId) : null; - if (service != null) - { - // Before passing the message to the service, need to check - // the security constraints. - Destination destination = service.getDestination(destId); - if (destination != null) - inspectOperation(command, destination); - - try - { - extractRemoteCredentials(service, command); - commandResult = service.serviceCommand(command); - serviced = true; - } - catch (UnsupportedOperationException e) - { - ServiceException se = new ServiceException(); - se.setMessage(ERR_MSG_SERVICE_CMD_NOT_SUPPORTED, new Object[] {service.getClass().getName()}); - throw se; - } - catch (SecurityException se) - { - // when a LOGIN message causes a security exception, we want to continue processing here - // to allow metadata to be sent to clients communicating with runtime destinations. - // The result will be an error message with a login fault message as well as the metadata - if (AUTHENTICATION_SERVICE_ID.equals(serviceId)) - { - commandResult = se.createErrorMessage(); - if (Log.isDebug()) - Log.getLogger(LOG_CATEGORY).debug("Security error for message: " + - se.toString() + StringUtils.NEWLINE + - " incomingMessage: " + command + StringUtils.NEWLINE + - " errorReply: " + commandResult); - serviced = true; - } - else - { - throw se; - } - } - } - - if (recreateHttpFlexSessionAfterLogin) - recreateHttpFlexSessionAfterLogin(); - - if (commandResult == null) - { - replyMessage = new AcknowledgeMessage(); - } - else if (commandResult instanceof AsyncMessage) - { - replyMessage = (AsyncMessage)commandResult; - } - else - { - replyMessage = new AcknowledgeMessage(); - replyMessage.setBody(commandResult); - } - - // Update the replyMessage body with server configuration if the - // operation is ping or login and make sure to return the FlexClient Id value. - if (command.getOperation() == CommandMessage.CLIENT_PING_OPERATION - || command.getOperation() == CommandMessage.LOGIN_OPERATION) - { - boolean needsConfig = false; - if (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER) != null) - needsConfig = ((Boolean)(command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER))); - - // Send configuration information only if the client requested. - if (needsConfig) - { - ConfigMap serverConfig = describeServices(endpoint); - if (serverConfig.size() > 0) - replyMessage.setBody(serverConfig); - } - - // Record the features available over this endpoint - double msgVersion = endpoint.getMessagingVersion(); - if (msgVersion > 0) - replyMessage.setHeader(CommandMessage.MESSAGING_VERSION, new Double(msgVersion)); - - // Record the flex client ID - FlexClient flexClient = FlexContext.getFlexClient(); - if (flexClient != null) - replyMessage.setHeader(Message.FLEX_CLIENT_ID_HEADER, flexClient.getId()); - } - else if (!serviced) - { - MessageException lme = new MessageException(); - // The supplied destination id is not registered with any service.. - lme.setMessage(ERR_MSG_NO_SERVICE_FOR_DEST); - throw lme; - } - - replyMessage.setCorrelationId(command.getMessageId()); - replyMessage.setClientId(command.getClientId()); - if (replyMessage.getBody() instanceof java.util.List) - { - replyMessage.setBody(((List) replyMessage.getBody()).toArray()); - } - - if (Log.isDebug()) - Log.getLogger(getLogCategory(command)).debug( - "Executed command: " + - (service == null ? "(default service)" : "service=" + - service.getId()) + StringUtils.NEWLINE + - " commandMessage: " + command + StringUtils.NEWLINE + - " replyMessage: " + replyMessage + StringUtils.NEWLINE); - - return replyMessage; - } - - /** - * Services call this method in order to send a message - * to a FlexClient. - * - * @param message the message - * @param messageClient the message client the message should be sent to - */ - public void routeMessageToMessageClient(Message message, MessageClient messageClient) - { - // Make sure message has a messageId - checkMessageId(message); - - // Route the message and the MessageClient (subscription) to the FlexClient to - // queue the message for delivery to the remote client. - // Reset the thread local FlexClient and FlexSession to be specific to the client - // we're pushing to, and then reset the context back to its original request handling state. - FlexClient requestFlexClient = FlexContext.getFlexClient(); - FlexSession requestFlexSession = FlexContext.getFlexSession(); - - FlexClient pushFlexClient = messageClient.getFlexClient(); - FlexContext.setThreadLocalFlexClient(pushFlexClient); - FlexContext.setThreadLocalSession(null); // Null because we don't have a currently active endpoint for the push client. - try - { - pushFlexClient.push(message, messageClient); - } - finally // Reset thread locals. - { - FlexContext.setThreadLocalFlexClient(requestFlexClient); - FlexContext.setThreadLocalSession(requestFlexSession); - } - } - - /** - * - * Check that the destination permits access over the endpoint, the security - * constraint of the destination permits the operation, and the service and - * the destination the message is targeting are running, - * - * @param message The incoming message. - * @param destination The destination to check against. - */ - public void inspectOperation(Message message, Destination destination) - { - inspectChannel(message, destination); - loginManager.checkConstraint(destination.getSecurityConstraint()); - - Service service = destination.getService(); - if (!service.isStarted()) - { - // {0} ''{1}'' cannot service message ''{2}'' in stopped state. - MessageException me = new MessageException(); - me.setMessage(ERR_MSG_CANNOT_SERVICE_STOPPED, new Object[]{SERVICE, service.getId(), message.getMessageId()}); - throw me; - } - - if (!destination.isStarted()) - { - // {0} ''{1}'' cannot service message ''{2}'' in stopped state. - MessageException me = new MessageException(); - me.setMessage(ERR_MSG_CANNOT_SERVICE_STOPPED, new Object[]{"Destination", destination.getId(), message.getMessageId()}); - throw me; - } - } - - /** - * - * Verify that this destination permits access over this endpoint. - * - * @param message The incoming message. - * @param destination The destination to check against. - */ - public void inspectChannel(Message message, Destination destination) - { - if (!enforceEndpointValidation && message.getHeader(Message.VALIDATE_ENDPOINT_HEADER) == null) - return; - - String messageChannel = (String)message.getHeader(Message.ENDPOINT_HEADER); - for (String channelId : destination.getChannels()) - { - if (channelId.equals(messageChannel)) - return; - } - MessageException lme = new MessageException(); - lme.setMessage(ERR_MSG_DESTINATION_UNACCESSIBLE, new Object[] {destination.getId(), messageChannel}); - throw lme; - } - - /** - * - * Returns the logging category to use for a given message. - * - * @param message the message - * @return the logging category to use for a given message - */ - public String getLogCategory(Message message) - { - if (message instanceof AbstractMessage) - return ((AbstractMessage) message).logCategory(); - return LogCategories.MESSAGE_GENERAL; - } - - /** - * This is the class loader used by the system to load user defined classes. - * - * @return <code>ClassLoader</code> the system should use to load user defined classes. - */ - public ClassLoader getClassLoader() - { - return classLoader; - } - - /** - * - * Sets the class loader used by the system to load user defined classes. - * - * @param classLoader The class loader used by the system to loader user defiend classes. - */ - public void setClassLoader(ClassLoader classLoader) - { - this.classLoader = classLoader; - } - - /** - * - * Used internally by AbstractService to check existence of destination and service id - * mapping in the destinationToService map. - * - * @param destId the destination id - * @param svcId the service id - * @param throwException true if an exception should be thrown if something goes wrong - * @return True if the destination is already registered. - */ - public boolean isDestinationRegistered(String destId, String svcId, boolean throwException) - { - // Do not allow multiple destinations with the same id across services - if (destinationToService.containsKey(destId)) - { - if (throwException) - { - // Cannot add destination with id ''{0}'' to service with id ''{1}'' because another service with id ''{2}'' already has a destination with the same id. - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.DUPLICATE_DEST_ID, new Object[]{destId, svcId, destinationToService.get(destId)}); - throw ex; - } - return true; - } - return false; - } - - /** - * - * Used internally by AbstractService to add destination and service id - * mapping to destinationToService map. - * - * @param destId Destination id. - * @param svcId Service id. - */ - public void registerDestination(String destId, String svcId) - { - // Do not allow multiple destinations with the same id across services - if (destinationToService.containsKey(destId)) - { - // Cannot add destination with id ''{0}'' to service with id ''{1}'' because another service with id ''{2}'' already has a destination with the same id. - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ConfigurationConstants.DUPLICATE_DEST_ID, new Object[]{destId, svcId, destinationToService.get(destId)}); - throw ex; - } - destinationToService.put(destId, svcId); - } - - /** - * - * Used internally by AbstractService to remove destination and service id - * mapping from destinationToService map. - * - * @param destId Destination id. - */ - public void unregisterDestination(String destId) - { - destinationToService.remove(destId); - } - - /** - * - * Looks up and returns a destination by id; removing the need to know which service - * a destination is registered for. - * - * @param destId Destination id. - * @return the Destination oblect for the given destination id - */ - public Destination getRegisteredDestination(String destId) - { - String serviceId = destId != null? destinationToService.get(destId) : null; - return serviceId != null? getService(serviceId).getDestination(destId) : null; - } - - /** - * Increments the count of destinations actively using an Application or Session - * level scoped assembler identified by the passed in attributeId. - * - * @param attributeId Attribute id for the session or application-scoped object. - */ - public void incrementAttributeIdRefCount(String attributeId) - { - synchronized (attributeIdRefCounts) - { - Integer currentCount = attributeIdRefCounts.get(attributeId); - if (currentCount == null) - attributeIdRefCounts.put(attributeId, INTEGER_ONE); - else - attributeIdRefCounts.put(attributeId, currentCount + 1); - } - } - - /** - * Decrements the count of destinations actively using an Application or Session - * level scoped assembler identified by the passed in attributeId. - * - * @param attributeId Attribute id for the session or application-scoped object. - * @return in the attribute ID ref count after decrement - */ - public int decrementAttributeIdRefCount(String attributeId) - { - synchronized (attributeIdRefCounts) - { - Integer currentCount = attributeIdRefCounts.get(attributeId); - if (currentCount == null) - return 0; - - int newValue = currentCount -1 ; - attributeIdRefCounts.put(attributeId, newValue); - return newValue; - } - } - - - //-------------------------------------------------------------------------- - // - // Protected Methods - // - //-------------------------------------------------------------------------- - - /** - * - * Utility method to make sure that message has an assigned messageId. - * - * @param message the message that should be checked - */ - protected void checkMessageId(Message message) - { - if (message.getMessageId() == null) - { - MessageException lme = new MessageException(); - lme.setMessage(ERR_MSG_NULL_MESSAGE_ID); - throw lme; - } - } - - /** - * - * Check the headers for the message for the RemoteCredentials. - * - * @param service the service - * @param message the message - */ - protected void extractRemoteCredentials(Service service, Message message) - { - if (!message.headerExists(Message.REMOTE_CREDENTIALS_HEADER)) - return; - - boolean setting = false; - String username = null; - String credentials = null; - if (message.getHeader(Message.REMOTE_CREDENTIALS_HEADER) instanceof String) - { - String encoded = (String)message.getHeader(Message.REMOTE_CREDENTIALS_HEADER); - if (encoded.length() > 0) //empty string is clearing the credentials - { - setting = true; - Base64.Decoder decoder = new Base64.Decoder(); - decoder.decode(encoded); - byte[] decodedBytes = decoder.drain(); - String decoded; - - String charset = (String)message.getHeader(Message.REMOTE_CREDENTIALS_CHARSET_HEADER); - if (charset != null) - { - try - { - decoded = new String(decodedBytes, charset); - } - catch (UnsupportedEncodingException ex) - { - MessageException lme = new MessageException(); - lme.setMessage(ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT); - throw lme; - } - } - else - { - decoded = new String(decodedBytes); - } - - int colon = decoded.indexOf(':'); - if (colon > 0 && colon < decoded.length() - 1) - { - username = decoded.substring(0, colon); - credentials = decoded.substring(colon + 1); - } - } - } - else - { - MessageException lme = new MessageException(); - lme.setMessage(ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT); - throw lme; - } - - if (setting) - { - FlexContext.getFlexSession().putRemoteCredentials( - new FlexRemoteCredentials(service.getId(), - message.getDestination(), username, credentials)); - } - else - { - FlexContext.getFlexSession().clearRemoteCredentials(service.getId(), - message.getDestination()); - } - } - - @Override - protected String getLogCategory() - { - return LOG_CATEGORY; - } - - - public void setServletContext(ServletContext servletContext) - { - this.servletContext = servletContext; - } - - /** - * - * This method was added so that Spring-BlazeDS Integration 1.0.2 works with latest BlazeDS binaries - * Internally, this method simply invokes the setServletContext(...) method - * - * @param servletContext ServletContext that should be set. - */ - protected void setInitServletContext(ServletContext servletContext) - { - setServletContext(servletContext); - } - - protected void recreateHttpFlexSessionAfterLogin() - { - FlexSession currentHttpFlexSession = FlexContext.getFlexSession(); - Principal principal = currentHttpFlexSession.getUserPrincipal(); - currentHttpFlexSession.invalidate(); // This will recreate a new session. - - FlexSession newHttpFlexSession = FlexContext.getFlexSession(); - newHttpFlexSession.setUserPrincipal(principal); - } - - /** - * Start all of the broker's endpoints. - * - * - */ - protected void startEndpoints() - { - for (Endpoint endpoint : endpoints.values()) - { - if (endpoint instanceof AbstractEndpoint && ((AbstractEndpoint)endpoint).isRemote()) - continue; // Local representation of remote endpoints are not started. - endpoint.start(); - } - } - - /** - * Stop all of the broker's endpoints. - */ - protected void stopEndpoints() - { - for (Endpoint endpoint : endpoints.values()) - { - if (endpoint instanceof AbstractEndpoint && ((AbstractEndpoint)endpoint).isRemote()) - continue; // Local representation of remote endpoints are not stopped. - endpoint.stop(); - } - } - - //-------------------------------------------------------------------------- - // - // Private Methods - // - //-------------------------------------------------------------------------- - - /** - * - */ - private void checkEndpointUrl(String id, String endpointUrl) - { - // Do not allow endpoints with null url property. - if (endpointUrl == null) - { - // Cannot add ''{0}'' with null url to the ''{1}'' - ConfigurationException ex = new ConfigurationException(); - ex.setMessage(ERR_MSG_NULL_ENDPOINT_URL, new Object[]{ENDPOINT, MESSAGEBROKER}); - throw ex; - } - - String parsedEndpointURI = ChannelSettings.removeTokens(endpointUrl); - - // first check the original URI - if (registeredEndpoints.containsKey(parsedEndpointURI)) - { - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(ERR_MSG_URI_ALREADY_REGISTERED, new Object[] {id, parsedEndpointURI, - registeredEndpoints.get(parsedEndpointURI)}); - throw ce; - } - - // add the original URI to the registered endpoints map - registeredEndpoints.put(parsedEndpointURI, id); - - // also need to check the URI without the context root - int nextSlash = parsedEndpointURI.indexOf('/', 1); - if (nextSlash > 0) - { - String parsedEndpointURI2 = parsedEndpointURI.substring(nextSlash); - if (registeredEndpoints.containsKey(parsedEndpointURI2)) - { - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(ERR_MSG_URI_ALREADY_REGISTERED, new Object[] { - parsedEndpointURI2, id, - registeredEndpoints.get(parsedEndpointURI2) }); - throw ce; - } - registeredEndpoints.put(parsedEndpointURI2, id); - } - } - - /** - * - * Matches the current "servlet + pathinfo" to a list of channels registered - * in the services configuration file, independent of context root. - * - * @param path The Servlet mapping and PathInfo of the current request - * @param contextPath The web application context root (or empty string for default root) - * @param endpoint The endpoint to be matched - * @return whether the current request matches a registered endpoint URI - * - */ - private boolean matchEndpoint(String path, String contextPath, Endpoint endpoint) - { - boolean match = false; - String channelEndpoint = endpoint.getParsedUrl(contextPath); - - if (path.endsWith("/")) - { - path = path.substring(0, path.length() - 1); - } - - if (path.equalsIgnoreCase(channelEndpoint)) - { - match = true; - } - - return match; - } - - private void registerMessageBroker() - { - String mbid = getId(); - - synchronized (messageBrokers) - { - if (messageBrokers.get(mbid) != null) - { - ConfigurationException ce = new ConfigurationException(); - ce.setMessage(10137, new Object[] {getId() == null ? "(no value supplied)" : mbid}); - throw ce; - } - messageBrokers.put(mbid, this); - } - } - - private void unRegisterMessageBroker() - { - String mbid = getId(); - - synchronized (messageBrokers) - { - messageBrokers.remove(mbid); - } - } - - /** - * Start all of the broker's shared servers. - */ - private void startServers() - { - for (Server server : servers.values()) - { - // Validate that the server is actually referenced by an endpoint; if not, warn. - boolean serverIsReferenced = false; - for (Endpoint endpoint : endpoints.values()) - { - if (endpoint instanceof Endpoint2 && server.equals(((Endpoint2)endpoint).getServer())) - { - serverIsReferenced = true; - break; - } - } - - if (!serverIsReferenced && Log.isWarn()) - Log.getLogger(LogCategories.CONFIGURATION).warn("Server '" + server.getId() + "' is not referenced by any endpoints."); - - server.start(); - } - } - - /** - * Stop all the broker's shared servers. - */ - private void stopServers() - { - for (Server server : servers.values()) - server.stop(); - } - - /** - * Start all of the broker's services. - * - * - */ - private void startServices() - { - for (Service svc : services.values() ) - { - long timeBeforeStartup = 0; - if (Log.isDebug()) - { - timeBeforeStartup = System.currentTimeMillis(); - Log.getLogger(LOG_CATEGORY_STARTUP_SERVICE).debug("Service with id '{0}' is starting.", - new Object[]{svc.getId()}); - } - - svc.start(); - - if (Log.isDebug()) - { - long timeAfterStartup = System.currentTimeMillis(); - Long diffMillis = timeAfterStartup - timeBeforeStartup; - Log.getLogger(LOG_CATEGORY_STARTUP_SERVICE).debug("Service with id '{0}' is ready (startup time: '{1}' ms)", - new Object[]{svc.getId(), diffMillis}); - } - } - } - - /** - * Stop all of the broker's services. - * - * - */ - private void stopServices() - { - for (Service svc : services.values()) - svc.stop(); - } -}
