http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/HttpFlexSessionProvider.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/HttpFlexSessionProvider.java b/core/src/flex/messaging/HttpFlexSessionProvider.java new file mode 100644 index 0000000..abdcb4c --- /dev/null +++ b/core/src/flex/messaging/HttpFlexSessionProvider.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpSession; + +import flex.messaging.log.Log; + +/** + * Provider implementation for <code>HttpFlexSession</code>s. + * Not intended for public use. + */ +public class HttpFlexSessionProvider extends AbstractFlexSessionProvider +{ + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Factory method to get an existing <tt>HttpFlexSession</tt> for the current request, + * or create and return a new <code>HttpFlexSession</code> if necessary. + * The <code>HttpFlexSession</code> wraps the underlying J2EE <code>HttpSession</code>. + * Not intended for public use. + * + * @param request The current <tt>HttpServletRequest</tt>. + * @return A <tt>HttpFlexSession</tt>. + */ + public HttpFlexSession getOrCreateSession(HttpServletRequest request) + { + HttpFlexSession flexSession; + HttpSession httpSession = request.getSession(true); + + if (!HttpFlexSession.isHttpSessionListener && !HttpFlexSession.warnedNoEventRedispatch) + { + HttpFlexSession.warnedNoEventRedispatch = true; + if (Log.isWarn()) + Log.getLogger(HttpFlexSession.WARN_LOG_CATEGORY).warn("HttpFlexSession has not been registered as a listener in web.xml for this application so no events will be dispatched to FlexSessionAttributeListeners or FlexSessionBindingListeners. To correct this, register flex.messaging.HttpFlexSession as a listener in web.xml."); + } + + boolean isNew = false; + synchronized (httpSession) + { + flexSession = (HttpFlexSession)httpSession.getAttribute(HttpFlexSession.SESSION_ATTRIBUTE); + if (flexSession == null) + { + flexSession = new HttpFlexSession(this); + // Correlate this FlexSession to the HttpSession before triggering any listeners. + FlexContext.setThreadLocalSession(flexSession); + httpSession.setAttribute(HttpFlexSession.SESSION_ATTRIBUTE, flexSession); + flexSession.setHttpSession(httpSession); + isNew = true; + } + else + { + FlexContext.setThreadLocalSession(flexSession); + if (flexSession.httpSession == null) + { + // httpSession is null if the instance is new or is from + // serialization. + flexSession.setHttpSession(httpSession); + isNew = true; + } + } + } + + if (isNew) + { + getFlexSessionManager().registerFlexSession(flexSession); + flexSession.notifyCreated(); + + if (Log.isDebug()) + Log.getLogger(HttpFlexSession.FLEX_SESSION_LOG_CATEGORY).debug("FlexSession created with id '" + flexSession.getId() + "' for an Http-based client connection."); + } + + return flexSession; + } + }
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageBroker.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageBroker.java b/core/src/flex/messaging/MessageBroker.java new file mode 100644 index 0000000..80b061e --- /dev/null +++ b/core/src/flex/messaging/MessageBroker.java @@ -0,0 +1,2252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import flex.management.ManageableComponent; +import flex.management.runtime.messaging.MessageBrokerControl; +import flex.management.runtime.messaging.log.LogManager; +import flex.messaging.client.FlexClient; +import flex.messaging.client.FlexClientManager; +import flex.messaging.cluster.ClusterManager; +import flex.messaging.config.ChannelSettings; +import flex.messaging.config.ConfigMap; +import flex.messaging.config.ConfigurationConstants; +import flex.messaging.config.ConfigurationException; +import flex.messaging.config.ConfigurationManager; +import flex.messaging.config.FlexClientSettings; +import flex.messaging.config.SecurityConstraint; +import flex.messaging.config.SecuritySettings; +import flex.messaging.config.SystemSettings; +import flex.messaging.endpoints.AbstractEndpoint; +import flex.messaging.endpoints.Endpoint; +import flex.messaging.endpoints.Endpoint2; +import flex.messaging.factories.JavaFactory; +import flex.messaging.io.BeanProxy; +import flex.messaging.io.PropertyProxyRegistry; +import flex.messaging.log.Log; +import flex.messaging.log.LogCategories; +import flex.messaging.messages.AbstractMessage; +import flex.messaging.messages.AcknowledgeMessage; +import flex.messaging.messages.AsyncMessage; +import flex.messaging.messages.CommandMessage; +import flex.messaging.messages.Message; +import flex.messaging.security.LoginManager; +import flex.messaging.security.SecurityException; +import flex.messaging.services.AbstractService; +import flex.messaging.services.Service; +import flex.messaging.services.ServiceException; +import flex.messaging.services.messaging.ThrottleManager; +import flex.messaging.util.Base64; +import flex.messaging.util.ClassUtil; +import flex.messaging.util.ExceptionUtil; +import flex.messaging.util.RedeployManager; +import flex.messaging.util.StringUtils; +import flex.messaging.util.UUIDGenerator; +import flex.messaging.util.UUIDUtils; +import flex.messaging.validators.DeserializationValidator; + +import javax.servlet.ServletContext; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The MessageBroker is the hub of message traffic. It has a number of endpoints which send and + * receive messages over the network, and it has a number of + * services that are message destinations. The broker routes + * decoded messages received by endpoints to services based + * on the service destination specified in each message. + * The broker also has a means of pushing messages back through + * endpoints to clients. + */ +public class MessageBroker extends ManageableComponent +{ + //-------------------------------------------------------------------------- + // + // Public Static Constants + // + //-------------------------------------------------------------------------- + + /** + * Id that the AuthenticationService uses to register itself with the broker. + */ + public static final String AUTHENTICATION_SERVICE_ID = "authentication-service"; + + /** + * Localized error messages for <code>MessageBroker</code>. + */ + public static final int ERR_MSG_NO_SERVICE_FOR_DEST = 10004; + public static final int ERR_MSG_DESTINATION_UNACCESSIBLE = 10005; + public static final int ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT = 10020; + public static final int ERR_MSG_NULL_MESSAGE_ID = 10029; + public static final int ERR_MSG_CANNOT_SERVICE_STOPPED = 10038; + public static final int ERR_MSG_NULL_ENDPOINT_URL = 10128; + public static final int ERR_MSG_SERVICE_CMD_NOT_SUPPORTED = 10451; + public static final int ERR_MSG_URI_ALREADY_REGISTERED = 11109; + + /** + * Log category for <code>MessageBroker</code>. + */ + public static final String LOG_CATEGORY = LogCategories.MESSAGE_GENERAL; + + /** + * Log category that captures startup information for broker's destinations. + */ + public static final String LOG_CATEGORY_STARTUP_SERVICE = LogCategories.STARTUP_SERVICE; + + + public static final String TYPE = "MessageBroker"; + + //-------------------------------------------------------------------------- + // + // Package Protected Static Constants + // + //-------------------------------------------------------------------------- + /** The default message broker id when one is not specified in web.xml. */ + public static final String DEFAULT_BROKER_ID = "__default__"; + + /** A map of currently available message brokers indexed by message broker id. */ + static final Map<String, MessageBroker> messageBrokers = new HashMap<String, MessageBroker>(); + + //-------------------------------------------------------------------------- + // + // Private Static Constants + // + //-------------------------------------------------------------------------- + + private static final String LOG_MANAGER_ID = "log"; + private static final Integer INTEGER_ONE = 1; + private static final String MESSAGEBROKER = "MessageBroker"; + private static final String ENDPOINT = "Endpoint"; + private static final String SERVICE = "Service"; + + //-------------------------------------------------------------------------- + // + // Constructors + // + //-------------------------------------------------------------------------- + + /** + * + * Create a MessageBroker. This constructor will + * establish collections for routers, endpoints, + * and services. + */ + public MessageBroker() + { + this(true, null); + } + + + public MessageBroker(boolean enableManagement) + { + this(enableManagement, null); + } + + + public MessageBroker(boolean enableManagement, String mbid) + { + this(enableManagement, mbid, MessageBroker.class.getClassLoader()); + } + + + public MessageBroker(boolean enableManagement, String mbid, ClassLoader loader) + { + super(enableManagement); + classLoader = loader; + attributes = new ConcurrentHashMap<String, Object>(); + destinationToService = new ConcurrentHashMap<String, String>(); + endpoints = new LinkedHashMap<String, Endpoint>(); + services = new LinkedHashMap<String, Service>(); + servers = new LinkedHashMap<String, Server>(); + factories = new HashMap<String, FlexFactory>(); + registeredEndpoints = new HashMap<String, String>(); + + // Add the built-in java factory + addFactory("java", new JavaFactory()); + + setId(mbid); + + log = Log.createLog(); + + clusterManager = new ClusterManager(this); + systemSettings = new SystemSettings(); + + if (isManaged()) + { + controller = new MessageBrokerControl(this); + controller.register(); + setControl(controller); + + logManager = new LogManager(); + logManager.setLog(log); + logManager.setParent(this); + logManager.setupLogControl(); + logManager.initialize(LOG_MANAGER_ID, null); + } + } + + //-------------------------------------------------------------------------- + // + // Variables + // + //-------------------------------------------------------------------------- + + private Map<String, Object> attributes; + /** + * Map of attribute ids of Application or Session level scoped destination assemblers + * to the number of active destinations referring to. + */ + private final Map<String, Integer> attributeIdRefCounts = new HashMap<String, Integer>(); + private Map<String, ChannelSettings> channelSettings; + private ClassLoader classLoader; + private ClusterManager clusterManager; + private MessageBrokerControl controller; + private List<String> defaultChannels; + private DeserializationValidator deserializationValidator; + private Map<String, String> destinationToService; // destiantionId to serviceId map. + private Map<String, Endpoint> endpoints; + private boolean enforceEndpointValidation; + private Map<String, FlexFactory> factories; + private FlexClientManager flexClientManager; + private FlexClientSettings flexClientSettings; + private FlexSessionManager flexSessionManager; + private PathResolver externalPathResolver; + private InternalPathResolver internalPathResolver; + private Log log; + private LogManager logManager; + private LoginManager loginManager; + private RedeployManager redeployManager; + private Map<String, String> registeredEndpoints; + private SecuritySettings securitySettings; + private Map<String, Service> services; + private Map<String, Server> servers; + private final ConcurrentHashMap<String, ServiceValidationListener> serviceValidationListeners = new ConcurrentHashMap<String, ServiceValidationListener>(); + private ServletContext servletContext; + private SystemSettings systemSettings; + private Class<? extends ThrottleManager> throttleManagerClass = ThrottleManager.class; // The default ThrottleManager class. + private UUIDGenerator uuidGenerator; + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Sets the id of the <code>MessageBroker</code>. If id is null, uses the + * default broker id. + * + * + */ + @Override public void setId(String id) + { + if (id == null) + id = DEFAULT_BROKER_ID; + + super.setId(id); + } + + /** + * Retrieves a message broker with the supplied id. This is defined via + * the servlet init parameter messageBrokerId. If no messageBrokerId is supplied, pass + * in a null value for the id parameter. + * + * In case null is passed, the method tries to lookup the broker from current FlexContext. + * If not available, it uses default ID to lookup the message broker. + * + * @param id The id of the message broker to retrieve. + * @return The <code>MessageBroker</code> for the supplied id. + */ + public static MessageBroker getMessageBroker(String id) + { + if (id == null) + { + // If available, return the broker from FlexContext + MessageBroker broker = FlexContext.getMessageBroker(); + if (broker != null) + { + return broker; + } + + id = DEFAULT_BROKER_ID; + } + + return messageBrokers.get(id); + } + + /** + * Start the message broker's endpoints and services. + * + */ + @Override public void start() + { + if (isStarted()) + return; + + /* + * J2EE can be a real pain in terms of getting the right class loader so dump out + * some detailed info about what is going on. + */ + if (Log.isDebug()) + { + StringBuffer sb = new StringBuffer(100); + if (classLoader == MessageBroker.class.getClassLoader()) + sb.append(" the MessageBroker's class loader"); + if (classLoader == Thread.currentThread().getContextClassLoader()) + { + if (sb.length() > 0) sb.append(" and"); + sb.append(" the context class loader"); + } + if (sb.length() == 0) + sb.append(" not the context or the message broker's class loader"); + Log.getLogger(LogCategories.CONFIGURATION).debug( + "MessageBroker id: " + getId() + " classLoader is:" + + sb.toString() + " (" + "classLoader " + ClassUtil.classLoaderToString(classLoader)); + } + + // Catch any startup errors and log using our log machinery, then rethrow to trigger shutdown. + try + { + // MessageBroker doesn't call super.start() because it doesn't need the + // usual validation that other components need + setStarted(true); + + registerMessageBroker(); + if (flexClientManager == null) + { + flexClientManager = new FlexClientManager(isManaged(), this); + } + flexClientManager.start(); + flexSessionManager = new FlexSessionManager(isManaged(), this); + flexSessionManager.start(); + if (systemSettings == null) + { + systemSettings = new SystemSettings(); + } + startServices(); + loginManager.start(); + startEndpoints(); + startServers(); + redeployManager.start(); + } + catch (Exception e) + { + if (Log.isError()) + Log.getLogger(LogCategories.CONFIGURATION).error("MessageBroker failed to start: " + ExceptionUtil.exceptionFollowedByRootCausesToString(e)); + + // Rethrow. + throw new RuntimeException(e.getMessage(), e); + } + } + + /** + * Stop the broker's endpoints, clusters, and services. + * + */ + @Override public void stop() + { + if (!isStarted()) + return; + + if (Log.isDebug()) + Log.getLogger(LogCategories.CONFIGURATION).debug("MessageBroker stopping: " + getId()); + + serviceValidationListeners.clear(); + + flexSessionManager.stop(); + flexClientManager.stop(); + stopServers(); + stopEndpoints(); + + // set this MB in FlexContext as it is needed for reference counts in destination stopping + FlexContext.setThreadLocalMessageBroker(this); + stopServices(); + FlexContext.setThreadLocalMessageBroker(null); + + if (loginManager != null) + loginManager.stop(); + try + { + if (redeployManager != null) + redeployManager.stop(); + } + catch (Throwable t) + { + t.printStackTrace(); + } + clusterManager.destroyClusters(); + + super.stop(); + unRegisterMessageBroker(); + + // clear static proxy caches + BeanProxy.clear(); + PropertyProxyRegistry.release(); + + // clear system settings + systemSettings.clear(); + systemSettings = null; + + if (Log.isDebug()) + Log.getLogger(LogCategories.CONFIGURATION).debug("MessageBroker stopped: " + getId()); + } + + /** + * Returns an <tt>Iterator</tt> containing the current names that attributes have been bound + * to the <tt>MessageBroker</tt> under. + * Use {@link #getAttribute(String)} to retrieve an attribute value. + * + * @return An iterator containing the current names of the attributes. + */ + public Iterator<String> getAttributeNames() + { + return attributes.keySet().iterator(); + } + + /** + * Returns the attribute value bound to the <tt>MessageBroker</tt> under the provided name. + * + * @param name The attribute name. + * @return Object the attribute object + */ + public Object getAttribute(String name) + { + return attributes.get(name); + } + + /** + * Binds an attribute value to the <tt>MessageBroker</tt> under the provided name. + * + * @param name The attribute name. + * @param value The attribute value. + */ + public void setAttribute(String name, Object value) + { + if (value == null) + removeAttribute(name); + else + attributes.put(name, value); + } + + /** + * Removes the attribute with the given name from the <tt>MessageBroker</tt>. + * + * @param name The attribute name. + */ + public void removeAttribute(String name) + { + attributes.remove(name); + } + + /** + * Returns the deserialization validator of the <tt>MessageBroker</tt> or null + * if none exists. + * + * @return The deserialization validator of the <tt>MessageBroker</tt> or null + * if none exists. + */ + public DeserializationValidator getDeserializationValidator() + { + return deserializationValidator; + } + + /** + * Sets the deserialization validator of the <tt>MessageBroker</tt>. + * + * @param deserializationValidator The deserialization validator. + */ + public void setDeserializationValidator(DeserializationValidator deserializationValidator) + { + this.deserializationValidator = deserializationValidator; + } + + public void setExternalPathResolver(PathResolver externalPathResolver) + { + this.externalPathResolver = externalPathResolver; + } + + + public void setInternalPathResolver(InternalPathResolver internalPathResolver) + { + this.internalPathResolver = internalPathResolver; + } + + + public InputStream resolveExternalPath(String filename) throws IOException + { + return externalPathResolver.resolve(filename); + } + + + public InputStream resolveInternalPath(String filename) throws IOException + { + return internalPathResolver.resolve(filename); + } + + + public interface PathResolver + { + InputStream resolve(String filename) throws IOException; + } + + /** + * This interface is being kept for backwards compatibility. + * + */ + public interface InternalPathResolver extends PathResolver + { + // No-op. + } + + + public ClusterManager getClusterManager() + { + return clusterManager; + } + + /** + * + * Add a <code>Server</code> to the broker's collection. + * + * @param server <code>Server</code> to be added. + */ + public void addServer(Server server) + { + if (server == null) + { + // Cannot add null ''{0}'' to the ''{1}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{"Server", MESSAGEBROKER}); + throw ex; + } + + String id = server.getId(); + + if (id == null) + { + // Cannot add ''{0}'' with null id to the ''{1}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{"Server", MESSAGEBROKER}); + throw ex; + } + + // No need to add if server is already added + Server currentServer = getServer(id); + if (currentServer == server) + return; + + // Do not allow servers with the same id + if (currentServer != null) + { + // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{"Server", id, MESSAGEBROKER}); + throw ex; + } + + servers.put(id, server); + } + + /** + * + * Returns the <code>Server</code> with the specified id. + * + * @param id The id of the <code>Server</code>/ + * @return The <code>Server</code> with the specified id or null if no + * <code>Server</code> with the id exists. + */ + public Server getServer(String id) + { + return servers.get(id); + } + + /** + * + * Stops and removes the <code>Server</code> from the set of shared servers managed by the <code>MessageBroker</code>. + * + * @param id The id of the <code>Server</code> to remove. + * @return <code>Server</code> that has been removed or <code>null</code> if it doesn't exist. + */ + public Server removeServer(String id) + { + Server server = servers.get(id); + if (server != null) + { + server.stop(); + servers.remove(id); + } + return server; + } + + /** + * + * Creates an <code>Endpoint</code> instance, sets its id and url. + * It further sets the endpoint manageable if the <code>MessageBroker</code> + * is manageable, and assigns its <code>MessageBroker</code> to the + * <code>MessageBroker</code> that created it. + * + * @param id The id of the endpoint. + * @param url The url of the endpoint. + * @param className The class name of the endpoint. + * + * @return The created <code>Endpoint</code> instance. + */ + public Endpoint createEndpoint(String id, String url, String className) + { + Class endpointClass = ClassUtil.createClass(className, getClassLoader()); + + Endpoint endpoint = (Endpoint)ClassUtil.createDefaultInstance(endpointClass, Endpoint.class); + endpoint.setId(id); + endpoint.setUrl(url); + endpoint.setManaged(isManaged()); + endpoint.setMessageBroker(this); + + return endpoint; + } + + /** + * + * Add an endpoint to the broker's collection. Broker will accept the endpoint + * to be added only if the endpoint is not null, it does not have null id or + * url, and it does not have the same id or url as another endpoint. + * + * @param endpoint Endpoint to be added. + */ + public void addEndpoint(Endpoint endpoint) + { + if (endpoint == null) + { + // Cannot add null ''{0}'' to the ''{1}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{ENDPOINT, MESSAGEBROKER}); + throw ex; + } + + String id = endpoint.getId(); + + if (id == null) + { + // Cannot add ''{0}'' with null id to the ''{1}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{ENDPOINT, MESSAGEBROKER}); + throw ex; + } + + // No need to add if endpoint is already added + if (getEndpoint(id) == endpoint) + return; + + // Do not allow endpoints with the same id + if (getEndpoint(id) != null) + { + // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{ENDPOINT, id, MESSAGEBROKER}); + throw ex; + } + + // Add the endpoint only if its url is not null and it is not registered + // by another channel + checkEndpointUrl(id, endpoint.getUrl()); + + // Finally add the endpoint to endpoints map + endpoints.put(id, endpoint); + } + + /** + * + * Returns the <code>Endpoint</code> with the specified id. + * + * @param id The id of the <code>Endpoint</code>/ + * @return The <code>Endpoint</code> with the specified id or null if no <code>Endpoint</code> with the id exists. + */ + public Endpoint getEndpoint(String id) + { + return endpoints.get(id); + } + + /** + * + * Retrieve the map of all endpoints in this broker. + * + * @return the map of all endpoints in this broker + */ + public Map<String, Endpoint> getEndpoints() + { + return endpoints; + } + + /** + * + * Retrieve an endpoint based on a requested URL path. Two endpoints should not be + * registered to the same path. + * + * @param path the URL path + * @param contextPath the context path + * @return endpoint based on a requested URL path + */ + public Endpoint getEndpoint(String path, String contextPath) + { + for (String id : endpoints.keySet()) + { + Endpoint e = endpoints.get(id); + + if (matchEndpoint(path, contextPath, e)) + { + return e; + } + } + MessageException lme = new MessageException(); + lme.setMessage(10003, new Object[] {path}); + throw lme; + } + + /** + * + * Removes an endpoint from the <code>MessageBroker</code>. + * + * @param id The id of the endpoint. + * @return The removed endpoint. + */ + public Endpoint removeEndpoint(String id) + { + Endpoint endpoint = getEndpoint(id); + if (endpoint != null) + { + endpoint.stop(); + endpoints.remove(id); + } + return endpoint; + } + + /** + * Returns whether the endpoint validation is enforced on the server, regardless + * of whether client requested endpoint validation or not. + * + * @return True if the endpoint validation is enforced on the server, regardless + * of whether client requested endpoint validation or not. + */ + public boolean isEnforceEndpointValidation() + { + return enforceEndpointValidation; + } + + /** + * Sets whether the endpoint validation is enforced on the server, regardless + * of whether client requested endpoint validation or not. + * + * @param enforceEndpointValidation The endpoint validation flag. + */ + public void setEnforceEndpointValidation(boolean enforceEndpointValidation) + { + this.enforceEndpointValidation = enforceEndpointValidation; + } + + /** + * Returns the <code>FlexFactory</code> with the specified id. + * + * @param id The id of the <code>FlexFactory</code>. + * @return The <code>FlexFactory</code> with the specified id or null if no + * factory with the id exists. + */ + public FlexFactory getFactory(String id) + { + return factories.get(id); + } + + /** + * Returns the map of <code>FlexFactory</code> instances. + * + * @return The map of <code>FlexFactory</code> instances. + */ + public Map<String, FlexFactory> getFactories() + { + return factories; + } + + /** + * Registers a factory with the <code>MessageBroker</code>. + * + * @param id The id of the factory. + * @param factory <code>FlexFactory</code> instance. + */ + public void addFactory(String id, FlexFactory factory) + { + if (id == null) + { + // Cannot add ''{0}'' with null id to the ''{1}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{"FlexFactory", MESSAGEBROKER}); + throw ex; + } + // No need to add if factory is already added + if (getFactory(id) == factory) + { + return; + } + // Do not allow multiple factories with the same id + if (getFactory(id) != null) + { + // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{"FlexFactory", id, MESSAGEBROKER}); + throw ex; + } + factories.put(id, factory); + } + + /** + * Removes the <code>FlexFactory</code> from the list of factories known + * by the <code>MessageBroker</code>. + * + * @param id The id of the <code>FlexFactory</code>. + * @return <code>FlexFactory</code> that has been removed. + */ + public FlexFactory removeFactory(String id) + { + FlexFactory factory = getFactory(id); + if (factory != null) + { + factories.remove(id); + } + return factory; + } + + /** + * Returns the <code>Service</code> with the specified id. + * + * @param id The id of the <code>Service</code>/ + * @return The <code>Service</code> with the specified id or null if no + * <code>Service</code> with the id exists. + */ + public Service getService(String id) + { + return services.get(id); + } + + /** + * Return a service of the specific type. + * The current list of services is searched for the specified class name. + * If there is no service of the specified type, null is returned. + * If there is more than one service of the same type it is undefined which instance is returned. + * If more than one service of a specific type is configured, + * callers should use {@link #getService(String)} and provide the service id of the specific service, + * or {@link #getServices()} to get access to the map of all registered services. + * + * @param type the fully qualified class name of the service implementation. + * @return a service or null if not found. + */ + public Service getServiceByType(String type) + { + for (Service svc : services.values()) + { + if (svc.getClass().getName().equals(type)) + { + return svc; + } + } + return null; + } + + /** + * Returns the Map of <code>Service</code> instances. + * + * @return The Map of <code>Service</code> instances. + */ + public Map<String, Service> getServices() + { + return services; + } + + /** + * Returns a <tt>ConfigMap</tt> of service and channel properties that the client + * needs. + * + * @param endpoint Endpoint used to filter the destinations of the service; + * no filtering is done if the endpoint is null. + * @return ConfigMap of server properties. + */ + public ConfigMap describeServices(Endpoint endpoint) + { + return describeServices(endpoint, true); + } + + /** + * + * Returns a <tt>ConfigMap</tt> of service and channel properties that the client + * needs. + * The <tt>allDestinations</tt> flag controls whether configuration for all + * destinations or only reliable client destinations is returned. + * + * @param endpoint Endpoint used to filter the destinations of the service. + * No filtering is done if the endpoint is null. + * @param onlyReliable When false, configuration for all destinations is + * returned instead of only reliable destinations. + * @return ConfigMap of service properties. + */ + public ConfigMap describeServices(Endpoint endpoint, boolean onlyReliable) + { + // Let the service validation listeners know about the configuration change. + if (!serviceValidationListeners.isEmpty()) + { + for (Enumeration<ServiceValidationListener> iter = serviceValidationListeners.elements(); iter.hasMoreElements();) + iter.nextElement().validateServices(); + } + + ConfigMap servicesConfig = new ConfigMap(); + + // Keep track of channel ids as we encounter them so we can generate + // the channel properties that might be needed by the client. + ArrayList<String> channelIds = new ArrayList<String>(); + if (endpoint == null) + { + for (Endpoint endpointToAdd: getEndpoints().values()) + channelIds.add(endpointToAdd.getId()); + } + else + { + channelIds.add(endpoint.getId()); + } + + if (defaultChannels != null) + { + ConfigMap defaultChannelsMap = new ConfigMap(); + for (Object defaultChannel : defaultChannels) + { + String id = (String) defaultChannel; + ConfigMap channelConfig = new ConfigMap(); + channelConfig.addProperty(ConfigurationConstants.REF_ATTR, id); + defaultChannelsMap.addProperty(ConfigurationConstants.CHANNEL_ELEMENT, channelConfig); + if (!channelIds.contains(id)) + channelIds.add(id); + } + if (defaultChannelsMap.size() > 0) + servicesConfig.addProperty(ConfigurationConstants.DEFAULT_CHANNELS_ELEMENT, defaultChannelsMap); + } + + for (Service service : services.values()) + { + ConfigMap serviceConfig = service instanceof AbstractService? + ((AbstractService)service).describeService(endpoint, onlyReliable) : service.describeService(endpoint); + if (serviceConfig != null && serviceConfig.size() > 0) + servicesConfig.addProperty(ConfigurationConstants.SERVICE_ELEMENT, serviceConfig); + } + + // Need to send channel properties again in case the client didn't + // compile in services-config.xml and hence doesn't have channels section + // of the configuration - but only if channel/endpoint is not tagged as "remote"! + ConfigMap channels = new ConfigMap(); + for (String id : channelIds) + { + Endpoint currentEndpoint = getEndpoint(id); + if (currentEndpoint instanceof AbstractEndpoint && ((AbstractEndpoint)currentEndpoint).isRemote()) + { + continue; // Client already has configuration for "remote" endpoint by other means. + } + + ConfigMap channel = currentEndpoint.describeEndpoint(); + if (channel != null && channel.size() > 0) + channels.addProperty(ConfigurationConstants.CHANNEL_ELEMENT, channel); + } + if (channels.size() > 0) + servicesConfig.addProperty(ConfigurationConstants.CHANNELS_ELEMENT, channels); + + if (Log.isDebug()) + Log.getLogger(ConfigurationManager.LOG_CATEGORY).debug( + "Returning service description for endpoint: " + + (endpoint == null? "all" : endpoint.getId()) + " config: " + servicesConfig); + + return servicesConfig; + } + + /** + * Add a listener for the describeServices callback. The describeServices listener + * is called before any execution of the describeServices method. + * + * @param id Identifier of the listener to add + * @param listener The listener callback + */ + public void addServiceValidationListener(String id, ServiceValidationListener listener) + { + if (listener != null) + { + serviceValidationListeners.putIfAbsent(id, listener); + } + } + + /** + * Returns an <tt>Iterator</tt> for all <tt>ServiceValidationListeners</tt> currently + * registered with the broker. + * + * @return An <tt>Iterator</tt> for all registered <tt>ServiceValidationListeners</tt>. + */ + public Iterator<ServiceValidationListener> getServiceValidationListenerIterator() + { + return serviceValidationListeners.values().iterator(); + } + + /** + * Remove a listener from the describeServices callback. + * + * @param id Identifier of the listener to remove + */ + public void removeServiceValidationListener(String id) + { + if (serviceValidationListeners.containsKey(id)) + { + serviceValidationListeners.remove(id); + } + } + + /** + * Creates a <code>Service</code> instance, sets its id, sets it manageable + * if the <code>MessageBroker</code> that created it is manageable, + * and sets its <code>MessageBroker</code> to the <code>MessageBroker</code> that + * created it. + * + * @param id The id of the <code>Service</code>. + * @param className The class name of the <code>Service</code>. + * + * @return The <code>Service</code> instanced created. + */ + public Service createService(String id, String className) + { + Class svcClass = ClassUtil.createClass(className, getClassLoader()); + + Service service = (Service)ClassUtil.createDefaultInstance(svcClass, Service.class); + service.setId(id); + service.setManaged(isManaged()); + service.setMessageBroker(this); + + return service; + } + + /** + * Add a message type -to- service mapping to the broker's collection. + * When the broker attempts to route a message to a service, it finds the first + * service capable of handling the message type. + * + * Note that <code>Service</code> cannot be null, it cannot have a null + * id, and it cannot have the same id or type of a <code>Service</code> + * already registered with the <code>MessageBroker</code>. + * + * <code>Service</code> needs to be started if the <code>MessageBroker</code> + * is already running. + * + * @param service The service instance used to handle the messages + * + */ + public void addService(Service service) + { + if (service == null) + { + // Cannot add null ''{0}'' to the ''{1}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.NULL_COMPONENT, new Object[]{SERVICE, MESSAGEBROKER}); + throw ex; + } + + String id = service.getId(); + + if (id == null) + { + // Cannot add ''{0}'' with null id to the ''{1}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.NULL_COMPONENT_ID, new Object[]{SERVICE, MESSAGEBROKER}); + throw ex; + } + // No need to add if service is already added + if (getService(id) == service) + { + return; + } + // Do not allow multiple services with the same id + if (getService(id) != null) + { + // Cannot add a ''{0}'' with the id ''{1}'' that is already registered with the ''{2}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.DUPLICATE_COMPONENT_ID, new Object[]{SERVICE, id, MESSAGEBROKER}); + throw ex; + } + // Not supposed to have multiple services of the same type; warn about it + // but still allow it. + String type = service.getClass().getName(); + if (getServiceByType(type) != null && Log.isWarn()) + Log.getLogger(LOG_CATEGORY).warn("Adding a service type '{0}' that is already registered with the MessageBroker", + new Object[]{type}); + + + services.put(id, service); + + if (service.getMessageBroker() == null || service.getMessageBroker() != this) + { + service.setMessageBroker(this); + } + } + + /** + * Removes the <code>Service</code> from the list of services known + * by the <code>MessageBroker</code>. + * + * @param id The id of the <code>Service</code>. + * @return Previous <code>Service</code> associated with the id. + */ + public Service removeService(String id) + { + Service service = getService(id); + if (service != null) + { + service.stop(); + services.remove(id); + } + return service; + } + + /** + * Returns the logger of the <code>MessageBroker</code>. + * + * @return Logger of the <code>MessageBroker</code>. + */ + public Log getLog() + { + return log; + } + + + public LogManager getLogManager() + { + return logManager; + } + + + public LoginManager getLoginManager() + { + return loginManager; + } + + + public void setLoginManager(LoginManager loginManager) + { + if (this.loginManager != null && this.loginManager.isStarted()) + this.loginManager.stop(); + + this.loginManager = loginManager; + + if (isStarted()) + loginManager.start(); + } + + + public FlexClientManager getFlexClientManager() + { + return flexClientManager; + } + + + public void setFlexClientManager(FlexClientManager value) + { + flexClientManager = value; + } + + + public FlexSessionManager getFlexSessionManager() + { + return flexSessionManager; + } + + + public void setFlexSessionManager(FlexSessionManager value) + { + flexSessionManager = value; + } + + + public RedeployManager getRedeployManager() + { + return redeployManager; + } + + + public void setRedeployManager(RedeployManager redeployManager) + { + if (this.redeployManager != null && this.redeployManager.isStarted()) + this.redeployManager.stop(); + + this.redeployManager = redeployManager; + + if (isStarted()) + redeployManager.start(); + } + + + public Class<? extends ThrottleManager> getThrottleManagerClass() + { + return throttleManagerClass; + } + + + public void setThrottleManagerClass(Class<? extends ThrottleManager> throttleManagerClass) + { + this.throttleManagerClass = throttleManagerClass; + } + + /** + * Returns a UUID either from the UUID generator assigned to <tt>MessageBroker</tt>, + * or from the <tt>UUIDUtils#createUUID</tt> if there is no assigned UUID generator. + * + * @return String the UUID. + */ + public String createUUID() + { + return uuidGenerator != null? uuidGenerator.createUUID() : UUIDUtils.createUUID(); + } + + /** + * Returns the custom <tt>UUIDGenerator</tt> used by the <tt>MessageBroker</tt> + * for NIO-HTTP session cookie value and <tt>FlexClient</tt> id generation or null if + * the default UUID generator, <tt>UUIDUtils</tt>, is being used. + * + * @return The custom <tt>UUIDGenerator</tt> used by <tt>MessageBroker</tt> or null. + */ + public UUIDGenerator getUUIDGenerator() + { + return uuidGenerator; + } + + + /** + * Sets the custom <tt>UUIDGenerator</tt> used by the <tt>MessageBroker</tt> + * for NIO-HTTP session cookie value and <tt>FlexClient</tt> id generation. + * + * @param value The custom <tt>UUIDGenerator</tt>. + */ + public void setUUIDGenerator(UUIDGenerator value) + { + uuidGenerator = value; + } + + /** + * Returns the list of channel ids known to the <code>MessageBroker</code>. + * + * @return The list of channel ids. + */ + public List<String> getChannelIds() + { + return (endpoints != null && endpoints.size() != 0)? new ArrayList<String>(endpoints.keySet()) : null; + } + + + public ChannelSettings getChannelSettings(String ref) + { + return channelSettings.get(ref); + } + + + public Map<String, ChannelSettings> getAllChannelSettings() + { + return channelSettings; + } + + + public void setChannelSettings(Map<String, ChannelSettings> channelSettings) + { + this.channelSettings = channelSettings; + } + + /** + * Returns the default channel ids of the MessageBroker. If a service + * specifies its own list of channels it overrides these defaults. + * + * @return Default channel ids of the MessageBroker. + */ + public List<String> getDefaultChannels() + { + return defaultChannels; + } + + /** + * Adds the channel id to the list of default channel ids. + * + * @param id The id of the channel to add to the list of default channel ids. + */ + public void addDefaultChannel(String id) + { + if (defaultChannels == null) + defaultChannels = new ArrayList<String>(); + else if (defaultChannels.contains(id)) + return; + + List<String> channelIds = getChannelIds(); + if (channelIds == null || !channelIds.contains(id)) + { + // No channel with id ''{0}'' is known by the MessageBroker. + if (Log.isWarn()) + { + Log.getLogger(LOG_CATEGORY).warn("No channel with id '{0}' is known by the MessageBroker." + + " Not adding the channel.", + new Object[]{id}); + } + return; + } + defaultChannels.add(id); + } + + /** + * Sets the default channel ids of the MessageBroker. + * + * @param ids Default channel ids of the MessageBroker. + */ + public void setDefaultChannels(List<String> ids) + { + if (ids != null) + { + List<String> channelIds = getChannelIds(); + for (Iterator<String> iter = ids.iterator(); iter.hasNext();) + { + String id = iter.next(); + if (channelIds == null || !channelIds.contains(id)) + { + iter.remove(); + if (Log.isWarn()) + { + Log.getLogger(LOG_CATEGORY).warn("No channel with id '{0}' is known by the MessageBroker." + + " Not adding the channel.", + new Object[]{id}); + } + } + } + } + defaultChannels = ids; + } + + /** + * Removes the channel id from the list of default channel ids. + * + * @param id The id of the channel to remove from the list of default channel ids. + * @return <code>true</code> if the list contained the channel id. + */ + public boolean removeDefaultChannel(String id) + { + return defaultChannels != null && defaultChannels.remove(id); + } + + /** + * Returns the <code>SecurityConstraint</code> with the indicated + * reference id. + * + * @param ref The reference of the <code>SecurityConstraint</code> + * @return The <code>SecurityConstraint</code> with the indicated reference id. + */ + public SecurityConstraint getSecurityConstraint(String ref) + { + return getSecuritySettings().getConstraint(ref); + } + + + public ServletContext getServletContext() + { + return servletContext; + } + + + public SecuritySettings getSecuritySettings() + { + return securitySettings; + } + + + public void setSecuritySettings(SecuritySettings securitySettings) + { + this.securitySettings = securitySettings; + } + + + public SystemSettings getSystemSettings() + { + return systemSettings; + } + + + public void setSystemSettings(SystemSettings l) + { + systemSettings = l; + } + + + public FlexClientSettings getFlexClientSettings() + { + return flexClientSettings; + } + + + public void setFlexClientSettings(FlexClientSettings value) + { + flexClientSettings = value; + } + + + public void initThreadLocals() + { + // No thread-locals anymore, so no-op. + } + + /** + * You can call this method in order to send a message from your code into + * the message routing system. The message is routed to a service that + * is defined to handle messages of this type. Once the service is identified, + * the destination property of the message is used to find a destination + * configured for that service. The adapter defined for that destination + * is used to handle the message. + * + * @param message The message to be routed to a service + * @param endpoint This can identify the endpoint that is sending the message + * but it is currently not used so you may pass in null. + * @return <code>AcknowledgeMessage</code> with result. + */ + public AcknowledgeMessage routeMessageToService(Message message, Endpoint endpoint) + { + // Make sure message has a messageId + checkMessageId(message); + + Object serviceResult = null; + boolean serviced = false; + Service service = null; + String destId = message.getDestination(); + try + { + String serviceId = destId != null ? destinationToService.get(destId) : null; + + if ((serviceId == null) && (destId != null) && (!serviceValidationListeners.isEmpty())) + { + for (Enumeration<ServiceValidationListener> iter = serviceValidationListeners.elements(); iter.hasMoreElements();) + { + iter.nextElement().validateDestination(destId); + } + serviceId = destinationToService.get(destId); + } + + if (serviceId != null) + { + service = services.get(serviceId); + serviced = true; + Destination destination = service.getDestination(destId); + inspectOperation(message, destination); + // Remove the validate endpoint header if it was set. + if (message.headerExists(Message.VALIDATE_ENDPOINT_HEADER)) + message.getHeaders().remove(Message.VALIDATE_ENDPOINT_HEADER); + + if (Log.isDebug()) + Log.getLogger(getLogCategory(message)).debug( + "Before invoke service: " + service.getId() + StringUtils.NEWLINE + + " incomingMessage: " + message + StringUtils.NEWLINE); + + extractRemoteCredentials(service, message); + serviceResult = service.serviceMessage(message); + } + + if (!serviced) + { + MessageException lme = new MessageException(); + // The supplied destination id is not registered with any service. + lme.setMessage(ERR_MSG_NO_SERVICE_FOR_DEST); + throw lme; + } + + if (Log.isDebug()) + { + String debugServiceResult = Log.getPrettyPrinter().prettify(serviceResult); + Log.getLogger(getLogCategory(message)).debug( + "After invoke service: " + service.getId() + StringUtils.NEWLINE + + " reply: " + debugServiceResult + StringUtils.NEWLINE); + } + + AcknowledgeMessage ack; + if (serviceResult instanceof AcknowledgeMessage) + { + // service will return an ack if they need to transform it in some + // service-specific way (paging is an example) + ack = (AcknowledgeMessage)serviceResult; + } + else + { + // most services will return a result of some sort, possibly null, + // and expect the broker to compose a message to deliver it + ack = new AcknowledgeMessage(); + ack.setBody(serviceResult); + } + ack.setCorrelationId(message.getMessageId()); + ack.setClientId(message.getClientId()); + return ack; + } + catch (MessageException exc) + { + exc.logAtHingePoint(message, + null, /* No outbound error message at this point. */ + "Exception when invoking service '" + (service == null ? "(none)" : service.getId()) + "': "); + + throw exc; + } + catch (RuntimeException exc) + { + Log.getLogger(LogCategories.MESSAGE_GENERAL).error( + "Exception when invoking service: " + + (service == null ? "(none)" : service.getId()) + + StringUtils.NEWLINE + + " with message: " + message + StringUtils.NEWLINE + + ExceptionUtil.exceptionFollowedByRootCausesToString(exc) + StringUtils.NEWLINE); + + throw exc; + } + catch (Error exc) + { + Log.getLogger(LogCategories.MESSAGE_GENERAL).error( + "Error when invoking service: " + + (service == null ? "(none)" : service.getId()) + + StringUtils.NEWLINE + + " with message: " + message + StringUtils.NEWLINE + + ExceptionUtil.exceptionFollowedByRootCausesToString(exc) + StringUtils.NEWLINE); + + throw exc; + } + + } + + + public AsyncMessage routeCommandToService(CommandMessage command, Endpoint endpoint) + { + // Make sure command has a messageId + checkMessageId(command); + + String destId = command.getDestination(); + + AsyncMessage replyMessage; + Service service; + String serviceId; + Object commandResult = null; + boolean serviced = false; + boolean recreateHttpFlexSessionAfterLogin = false; + + // Forward login and logout commands to AuthenticationService + int operation = command.getOperation(); + if (operation == CommandMessage.LOGIN_OPERATION || operation == CommandMessage.LOGOUT_OPERATION) + { + serviceId = AUTHENTICATION_SERVICE_ID; + recreateHttpFlexSessionAfterLogin = securitySettings.isRecreateHttpSessionAfterLogin() + && operation == CommandMessage.LOGIN_OPERATION && FlexContext.getFlexSession() instanceof HttpFlexSession; + } + else + { + serviceId = destId != null? destinationToService.get(destId) : null; + } + + service = serviceId != null? services.get(serviceId) : null; + if (service != null) + { + // Before passing the message to the service, need to check + // the security constraints. + Destination destination = service.getDestination(destId); + if (destination != null) + inspectOperation(command, destination); + + try + { + extractRemoteCredentials(service, command); + commandResult = service.serviceCommand(command); + serviced = true; + } + catch (UnsupportedOperationException e) + { + ServiceException se = new ServiceException(); + se.setMessage(ERR_MSG_SERVICE_CMD_NOT_SUPPORTED, new Object[] {service.getClass().getName()}); + throw se; + } + catch (SecurityException se) + { + // when a LOGIN message causes a security exception, we want to continue processing here + // to allow metadata to be sent to clients communicating with runtime destinations. + // The result will be an error message with a login fault message as well as the metadata + if (AUTHENTICATION_SERVICE_ID.equals(serviceId)) + { + commandResult = se.createErrorMessage(); + if (Log.isDebug()) + Log.getLogger(LOG_CATEGORY).debug("Security error for message: " + + se.toString() + StringUtils.NEWLINE + + " incomingMessage: " + command + StringUtils.NEWLINE + + " errorReply: " + commandResult); + serviced = true; + } + else + { + throw se; + } + } + } + + if (recreateHttpFlexSessionAfterLogin) + recreateHttpFlexSessionAfterLogin(); + + if (commandResult == null) + { + replyMessage = new AcknowledgeMessage(); + } + else if (commandResult instanceof AsyncMessage) + { + replyMessage = (AsyncMessage)commandResult; + } + else + { + replyMessage = new AcknowledgeMessage(); + replyMessage.setBody(commandResult); + } + + // Update the replyMessage body with server configuration if the + // operation is ping or login and make sure to return the FlexClient Id value. + if (command.getOperation() == CommandMessage.CLIENT_PING_OPERATION + || command.getOperation() == CommandMessage.LOGIN_OPERATION) + { + boolean needsConfig = false; + if (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER) != null) + needsConfig = ((Boolean)(command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER))); + + // Send configuration information only if the client requested. + if (needsConfig) + { + ConfigMap serverConfig = describeServices(endpoint); + if (serverConfig.size() > 0) + replyMessage.setBody(serverConfig); + } + + // Record the features available over this endpoint + double msgVersion = endpoint.getMessagingVersion(); + if (msgVersion > 0) + replyMessage.setHeader(CommandMessage.MESSAGING_VERSION, new Double(msgVersion)); + + // Record the flex client ID + FlexClient flexClient = FlexContext.getFlexClient(); + if (flexClient != null) + replyMessage.setHeader(Message.FLEX_CLIENT_ID_HEADER, flexClient.getId()); + } + else if (!serviced) + { + MessageException lme = new MessageException(); + // The supplied destination id is not registered with any service.. + lme.setMessage(ERR_MSG_NO_SERVICE_FOR_DEST); + throw lme; + } + + replyMessage.setCorrelationId(command.getMessageId()); + replyMessage.setClientId(command.getClientId()); + if (replyMessage.getBody() instanceof java.util.List) + { + replyMessage.setBody(((List) replyMessage.getBody()).toArray()); + } + + if (Log.isDebug()) + Log.getLogger(getLogCategory(command)).debug( + "Executed command: " + + (service == null ? "(default service)" : "service=" + + service.getId()) + StringUtils.NEWLINE + + " commandMessage: " + command + StringUtils.NEWLINE + + " replyMessage: " + replyMessage + StringUtils.NEWLINE); + + return replyMessage; + } + + /** + * Services call this method in order to send a message + * to a FlexClient. + * + * @param message the message + * @param messageClient the message client the message should be sent to + */ + public void routeMessageToMessageClient(Message message, MessageClient messageClient) + { + // Make sure message has a messageId + checkMessageId(message); + + // Route the message and the MessageClient (subscription) to the FlexClient to + // queue the message for delivery to the remote client. + // Reset the thread local FlexClient and FlexSession to be specific to the client + // we're pushing to, and then reset the context back to its original request handling state. + FlexClient requestFlexClient = FlexContext.getFlexClient(); + FlexSession requestFlexSession = FlexContext.getFlexSession(); + + FlexClient pushFlexClient = messageClient.getFlexClient(); + FlexContext.setThreadLocalFlexClient(pushFlexClient); + FlexContext.setThreadLocalSession(null); // Null because we don't have a currently active endpoint for the push client. + try + { + pushFlexClient.push(message, messageClient); + } + finally // Reset thread locals. + { + FlexContext.setThreadLocalFlexClient(requestFlexClient); + FlexContext.setThreadLocalSession(requestFlexSession); + } + } + + /** + * + * Check that the destination permits access over the endpoint, the security + * constraint of the destination permits the operation, and the service and + * the destination the message is targeting are running, + * + * @param message The incoming message. + * @param destination The destination to check against. + */ + public void inspectOperation(Message message, Destination destination) + { + inspectChannel(message, destination); + loginManager.checkConstraint(destination.getSecurityConstraint()); + + Service service = destination.getService(); + if (!service.isStarted()) + { + // {0} ''{1}'' cannot service message ''{2}'' in stopped state. + MessageException me = new MessageException(); + me.setMessage(ERR_MSG_CANNOT_SERVICE_STOPPED, new Object[]{SERVICE, service.getId(), message.getMessageId()}); + throw me; + } + + if (!destination.isStarted()) + { + // {0} ''{1}'' cannot service message ''{2}'' in stopped state. + MessageException me = new MessageException(); + me.setMessage(ERR_MSG_CANNOT_SERVICE_STOPPED, new Object[]{"Destination", destination.getId(), message.getMessageId()}); + throw me; + } + } + + /** + * + * Verify that this destination permits access over this endpoint. + * + * @param message The incoming message. + * @param destination The destination to check against. + */ + public void inspectChannel(Message message, Destination destination) + { + if (!enforceEndpointValidation && message.getHeader(Message.VALIDATE_ENDPOINT_HEADER) == null) + return; + + String messageChannel = (String)message.getHeader(Message.ENDPOINT_HEADER); + for (String channelId : destination.getChannels()) + { + if (channelId.equals(messageChannel)) + return; + } + MessageException lme = new MessageException(); + lme.setMessage(ERR_MSG_DESTINATION_UNACCESSIBLE, new Object[] {destination.getId(), messageChannel}); + throw lme; + } + + /** + * + * Returns the logging category to use for a given message. + * + * @param message the message + * @return the logging category to use for a given message + */ + public String getLogCategory(Message message) + { + if (message instanceof AbstractMessage) + return ((AbstractMessage) message).logCategory(); + return LogCategories.MESSAGE_GENERAL; + } + + /** + * This is the class loader used by the system to load user defined classes. + * + * @return <code>ClassLoader</code> the system should use to load user defined classes. + */ + public ClassLoader getClassLoader() + { + return classLoader; + } + + /** + * + * Sets the class loader used by the system to load user defined classes. + * + * @param classLoader The class loader used by the system to loader user defiend classes. + */ + public void setClassLoader(ClassLoader classLoader) + { + this.classLoader = classLoader; + } + + /** + * + * Used internally by AbstractService to check existence of destination and service id + * mapping in the destinationToService map. + * + * @param destId the destination id + * @param svcId the service id + * @param throwException true if an exception should be thrown if something goes wrong + * @return True if the destination is already registered. + */ + public boolean isDestinationRegistered(String destId, String svcId, boolean throwException) + { + // Do not allow multiple destinations with the same id across services + if (destinationToService.containsKey(destId)) + { + if (throwException) + { + // Cannot add destination with id ''{0}'' to service with id ''{1}'' because another service with id ''{2}'' already has a destination with the same id. + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.DUPLICATE_DEST_ID, new Object[]{destId, svcId, destinationToService.get(destId)}); + throw ex; + } + return true; + } + return false; + } + + /** + * + * Used internally by AbstractService to add destination and service id + * mapping to destinationToService map. + * + * @param destId Destination id. + * @param svcId Service id. + */ + public void registerDestination(String destId, String svcId) + { + // Do not allow multiple destinations with the same id across services + if (destinationToService.containsKey(destId)) + { + // Cannot add destination with id ''{0}'' to service with id ''{1}'' because another service with id ''{2}'' already has a destination with the same id. + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ConfigurationConstants.DUPLICATE_DEST_ID, new Object[]{destId, svcId, destinationToService.get(destId)}); + throw ex; + } + destinationToService.put(destId, svcId); + } + + /** + * + * Used internally by AbstractService to remove destination and service id + * mapping from destinationToService map. + * + * @param destId Destination id. + */ + public void unregisterDestination(String destId) + { + destinationToService.remove(destId); + } + + /** + * + * Looks up and returns a destination by id; removing the need to know which service + * a destination is registered for. + * + * @param destId Destination id. + * @return the Destination oblect for the given destination id + */ + public Destination getRegisteredDestination(String destId) + { + String serviceId = destId != null? destinationToService.get(destId) : null; + return serviceId != null? getService(serviceId).getDestination(destId) : null; + } + + /** + * Increments the count of destinations actively using an Application or Session + * level scoped assembler identified by the passed in attributeId. + * + * @param attributeId Attribute id for the session or application-scoped object. + */ + public void incrementAttributeIdRefCount(String attributeId) + { + synchronized (attributeIdRefCounts) + { + Integer currentCount = attributeIdRefCounts.get(attributeId); + if (currentCount == null) + attributeIdRefCounts.put(attributeId, INTEGER_ONE); + else + attributeIdRefCounts.put(attributeId, currentCount + 1); + } + } + + /** + * Decrements the count of destinations actively using an Application or Session + * level scoped assembler identified by the passed in attributeId. + * + * @param attributeId Attribute id for the session or application-scoped object. + * @return in the attribute ID ref count after decrement + */ + public int decrementAttributeIdRefCount(String attributeId) + { + synchronized (attributeIdRefCounts) + { + Integer currentCount = attributeIdRefCounts.get(attributeId); + if (currentCount == null) + return 0; + + int newValue = currentCount -1 ; + attributeIdRefCounts.put(attributeId, newValue); + return newValue; + } + } + + + //-------------------------------------------------------------------------- + // + // Protected Methods + // + //-------------------------------------------------------------------------- + + /** + * + * Utility method to make sure that message has an assigned messageId. + * + * @param message the message that should be checked + */ + protected void checkMessageId(Message message) + { + if (message.getMessageId() == null) + { + MessageException lme = new MessageException(); + lme.setMessage(ERR_MSG_NULL_MESSAGE_ID); + throw lme; + } + } + + /** + * + * Check the headers for the message for the RemoteCredentials. + * + * @param service the service + * @param message the message + */ + protected void extractRemoteCredentials(Service service, Message message) + { + if (!message.headerExists(Message.REMOTE_CREDENTIALS_HEADER)) + return; + + boolean setting = false; + String username = null; + String credentials = null; + if (message.getHeader(Message.REMOTE_CREDENTIALS_HEADER) instanceof String) + { + String encoded = (String)message.getHeader(Message.REMOTE_CREDENTIALS_HEADER); + if (encoded.length() > 0) //empty string is clearing the credentials + { + setting = true; + Base64.Decoder decoder = new Base64.Decoder(); + decoder.decode(encoded); + byte[] decodedBytes = decoder.drain(); + String decoded; + + String charset = (String)message.getHeader(Message.REMOTE_CREDENTIALS_CHARSET_HEADER); + if (charset != null) + { + try + { + decoded = new String(decodedBytes, charset); + } + catch (UnsupportedEncodingException ex) + { + MessageException lme = new MessageException(); + lme.setMessage(ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT); + throw lme; + } + } + else + { + decoded = new String(decodedBytes); + } + + int colon = decoded.indexOf(':'); + if (colon > 0 && colon < decoded.length() - 1) + { + username = decoded.substring(0, colon); + credentials = decoded.substring(colon + 1); + } + } + } + else + { + MessageException lme = new MessageException(); + lme.setMessage(ERR_MSG_UNKNOWN_REMOTE_CREDENTIALS_FORMAT); + throw lme; + } + + if (setting) + { + FlexContext.getFlexSession().putRemoteCredentials( + new FlexRemoteCredentials(service.getId(), + message.getDestination(), username, credentials)); + } + else + { + FlexContext.getFlexSession().clearRemoteCredentials(service.getId(), + message.getDestination()); + } + } + + @Override + protected String getLogCategory() + { + return LOG_CATEGORY; + } + + + public void setServletContext(ServletContext servletContext) + { + this.servletContext = servletContext; + } + + /** + * + * This method was added so that Spring-BlazeDS Integration 1.0.2 works with latest BlazeDS binaries + * Internally, this method simply invokes the setServletContext(...) method + * + * @param servletContext ServletContext that should be set. + */ + protected void setInitServletContext(ServletContext servletContext) + { + setServletContext(servletContext); + } + + protected void recreateHttpFlexSessionAfterLogin() + { + FlexSession currentHttpFlexSession = FlexContext.getFlexSession(); + Principal principal = currentHttpFlexSession.getUserPrincipal(); + currentHttpFlexSession.invalidate(); // This will recreate a new session. + + FlexSession newHttpFlexSession = FlexContext.getFlexSession(); + newHttpFlexSession.setUserPrincipal(principal); + } + + /** + * Start all of the broker's endpoints. + * + * + */ + protected void startEndpoints() + { + for (Endpoint endpoint : endpoints.values()) + { + if (endpoint instanceof AbstractEndpoint && ((AbstractEndpoint)endpoint).isRemote()) + continue; // Local representation of remote endpoints are not started. + endpoint.start(); + } + } + + /** + * Stop all of the broker's endpoints. + */ + protected void stopEndpoints() + { + for (Endpoint endpoint : endpoints.values()) + { + if (endpoint instanceof AbstractEndpoint && ((AbstractEndpoint)endpoint).isRemote()) + continue; // Local representation of remote endpoints are not stopped. + endpoint.stop(); + } + } + + //-------------------------------------------------------------------------- + // + // Private Methods + // + //-------------------------------------------------------------------------- + + /** + * + */ + private void checkEndpointUrl(String id, String endpointUrl) + { + // Do not allow endpoints with null url property. + if (endpointUrl == null) + { + // Cannot add ''{0}'' with null url to the ''{1}'' + ConfigurationException ex = new ConfigurationException(); + ex.setMessage(ERR_MSG_NULL_ENDPOINT_URL, new Object[]{ENDPOINT, MESSAGEBROKER}); + throw ex; + } + + String parsedEndpointURI = ChannelSettings.removeTokens(endpointUrl); + + // first check the original URI + if (registeredEndpoints.containsKey(parsedEndpointURI)) + { + ConfigurationException ce = new ConfigurationException(); + ce.setMessage(ERR_MSG_URI_ALREADY_REGISTERED, new Object[] {id, parsedEndpointURI, + registeredEndpoints.get(parsedEndpointURI)}); + throw ce; + } + + // add the original URI to the registered endpoints map + registeredEndpoints.put(parsedEndpointURI, id); + + // also need to check the URI without the context root + int nextSlash = parsedEndpointURI.indexOf('/', 1); + if (nextSlash > 0) + { + String parsedEndpointURI2 = parsedEndpointURI.substring(nextSlash); + if (registeredEndpoints.containsKey(parsedEndpointURI2)) + { + ConfigurationException ce = new ConfigurationException(); + ce.setMessage(ERR_MSG_URI_ALREADY_REGISTERED, new Object[] { + parsedEndpointURI2, id, + registeredEndpoints.get(parsedEndpointURI2) }); + throw ce; + } + registeredEndpoints.put(parsedEndpointURI2, id); + } + } + + /** + * + * Matches the current "servlet + pathinfo" to a list of channels registered + * in the services configuration file, independent of context root. + * + * @param path The Servlet mapping and PathInfo of the current request + * @param contextPath The web application context root (or empty string for default root) + * @param endpoint The endpoint to be matched + * @return whether the current request matches a registered endpoint URI + * + */ + private boolean matchEndpoint(String path, String contextPath, Endpoint endpoint) + { + boolean match = false; + String channelEndpoint = endpoint.getParsedUrl(contextPath); + + if (path.endsWith("/")) + { + path = path.substring(0, path.length() - 1); + } + + if (path.equalsIgnoreCase(channelEndpoint)) + { + match = true; + } + + return match; + } + + private void registerMessageBroker() + { + String mbid = getId(); + + synchronized (messageBrokers) + { + if (messageBrokers.get(mbid) != null) + { + ConfigurationException ce = new ConfigurationException(); + ce.setMessage(10137, new Object[] {getId() == null ? "(no value supplied)" : mbid}); + throw ce; + } + messageBrokers.put(mbid, this); + } + } + + private void unRegisterMessageBroker() + { + String mbid = getId(); + + synchronized (messageBrokers) + { + messageBrokers.remove(mbid); + } + } + + /** + * Start all of the broker's shared servers. + */ + private void startServers() + { + for (Server server : servers.values()) + { + // Validate that the server is actually referenced by an endpoint; if not, warn. + boolean serverIsReferenced = false; + for (Endpoint endpoint : endpoints.values()) + { + if (endpoint instanceof Endpoint2 && server.equals(((Endpoint2)endpoint).getServer())) + { + serverIsReferenced = true; + break; + } + } + + if (!serverIsReferenced && Log.isWarn()) + Log.getLogger(LogCategories.CONFIGURATION).warn("Server '" + server.getId() + "' is not referenced by any endpoints."); + + server.start(); + } + } + + /** + * Stop all the broker's shared servers. + */ + private void stopServers() + { + for (Server server : servers.values()) + server.stop(); + } + + /** + * Start all of the broker's services. + * + * + */ + private void startServices() + { + for (Service svc : services.values() ) + { + long timeBeforeStartup = 0; + if (Log.isDebug()) + { + timeBeforeStartup = System.currentTimeMillis(); + Log.getLogger(LOG_CATEGORY_STARTUP_SERVICE).debug("Service with id '{0}' is starting.", + new Object[]{svc.getId()}); + } + + svc.start(); + + if (Log.isDebug()) + { + long timeAfterStartup = System.currentTimeMillis(); + Long diffMillis = timeAfterStartup - timeBeforeStartup; + Log.getLogger(LOG_CATEGORY_STARTUP_SERVICE).debug("Service with id '{0}' is ready (startup time: '{1}' ms)", + new Object[]{svc.getId(), diffMillis}); + } + } + } + + /** + * Stop all of the broker's services. + * + * + */ + private void stopServices() + { + for (Service svc : services.values()) + svc.stop(); + } +}
