http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/endpoints/AbstractEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/AbstractEndpoint.java b/core/src/flex/messaging/endpoints/AbstractEndpoint.java new file mode 100644 index 0000000..c265ea9 --- /dev/null +++ b/core/src/flex/messaging/endpoints/AbstractEndpoint.java @@ -0,0 +1,1513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.endpoints; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import flex.management.ManageableComponent; +import flex.management.runtime.messaging.MessageBrokerControl; +import flex.management.runtime.messaging.endpoints.EndpointControl; +import flex.messaging.FlexContext; +import flex.messaging.FlexSession; +import flex.messaging.MessageBroker; +import flex.messaging.MessageException; +import flex.messaging.Server; +import flex.messaging.client.FlexClient; +import flex.messaging.client.FlexClientOutboundQueueProcessor; +import flex.messaging.client.FlushResult; +import flex.messaging.client.PollFlushResult; +import flex.messaging.client.UserAgentSettings; +import flex.messaging.config.ChannelSettings; +import flex.messaging.config.ConfigMap; +import flex.messaging.config.ConfigurationConstants; +import flex.messaging.config.ConfigurationException; +import flex.messaging.config.SecurityConstraint; +import flex.messaging.io.ClassAliasRegistry; +import flex.messaging.io.SerializationContext; +import flex.messaging.io.TypeMarshaller; +import flex.messaging.io.TypeMarshallingContext; +import flex.messaging.io.amf.translator.ASTranslator; +import flex.messaging.log.Log; +import flex.messaging.log.LogCategories; +import flex.messaging.log.Logger; +import flex.messaging.messages.AcknowledgeMessage; +import flex.messaging.messages.AcknowledgeMessageExt; +import flex.messaging.messages.AsyncMessage; +import flex.messaging.messages.AsyncMessageExt; +import flex.messaging.messages.CommandMessage; +import flex.messaging.messages.CommandMessageExt; +import flex.messaging.messages.Message; +import flex.messaging.messages.SmallMessage; +import flex.messaging.security.SecurityException; +import flex.messaging.util.ClassUtil; +import flex.messaging.util.StringUtils; +import flex.messaging.util.UserAgentManager; +import flex.messaging.validators.DeserializationValidator; + +/** + * This is the default implementation of Endpoint, which provides a convenient + * base for behavior and associations common to all endpoints. + * + * These properties that appear in the endpoint configuration are only used by the + * client, therefore they have to be set on the appropriate client classes: connect-timeout-seconds set on Channel. + * + * @see flex.messaging.endpoints.Endpoint + */ +public abstract class AbstractEndpoint extends ManageableComponent + implements Endpoint2, ConfigurationConstants +{ + /** Log category for <code>AbstractEndpoint</code>. */ + public static final String LOG_CATEGORY = LogCategories.ENDPOINT_GENERAL; + + /** + * HTTP header field names. + */ + public static final String HEADER_NAME_CACHE_CONTROL = "Cache-Control"; + public static final String HEADER_NAME_EXPIRES = "Expires"; + public static final String HEADER_NAME_PRAGMA = "Pragma"; + + // Errors + private static final int NONSECURE_PROTOCOL = 10066; + private static final int REQUIRES_FLEXCLIENT_SUPPORT = 10030; + private static final int ERR_MSG_INVALID_URL_SCHEME = 11100; + + // XML Configuration Properties + private static final String SERIALIZATION = "serialization"; + private static final String CREATE_ASOBJECT_FOR_MISSING_TYPE = "create-asobject-for-missing-type"; + private static final String CUSTOM_DESERIALIZER = "custom-deserializer"; + private static final String CUSTOM_SERIALIZER = "custom-serializer"; + private static final String ENABLE_SMALL_MESSAGES = "enable-small-messages"; + private static final String TYPE_MARSHALLER = "type-marshaller"; + private static final String RESTORE_REFERENCES = "restore-references"; + private static final String INSTANTIATE_TYPES = "instantiate-types"; + private static final String SUPPORT_REMOTE_CLASS = "support-remote-class"; + private static final String LEGACY_COLLECTION = "legacy-collection"; + private static final String LEGACY_DICTIONARY = "legacy-dictionary"; + private static final String LEGACY_MAP = "legacy-map"; + private static final String LEGACY_XML = "legacy-xml"; + private static final String LEGACY_XML_NAMESPACES = "legacy-xml-namespaces"; + private static final String LEGACY_THROWABLE = "legacy-throwable"; + private static final String LEGACY_BIG_NUMBERS = "legacy-big-numbers"; + private static final String LEGACY_EXTERNALIZABLE = "legacy-externalizable"; + private static final String ALLOW_XML_DOCTYPE_DECLARATION = "allow-xml-doctype-declaration"; + private static final String ALLOW_XML_EXTERNAL_ENTITY_EXPANSION = "allow-xml-external-entity-expansion"; + + private static final String LOG_PROPERTY_ERRORS = "log-property-errors"; + private static final String IGNORE_PROPERTY_ERRORS = "ignore-property-errors"; + private static final String INCLUDE_READ_ONLY = "include-read-only"; + private static final String GLOBAL_INCLUDE_READ_ONLY = "global-include-read-only"; + private static final String FLEX_CLIENT_OUTBOUND_QUEUE_PROCESSOR = "flex-client-outbound-queue-processor"; + private static final String SHOW_STACKTRACES = "show-stacktraces"; + private static final String MAX_OBJECT_NEST_LEVEL = "max-object-nest-level"; + private static final String MAX_COLLECTION_NEST_LEVEL = "max-collection-nest-level"; + private static final String PREFER_VECTORS = "prefer-vectors"; + + // Endpoint properties + protected Set<String> clientLoadBalancingUrls; + protected String clientType; + protected int connectTimeoutSeconds; + protected int requestTimeoutSeconds; + protected FlexClientOutboundQueueProcessor flexClientOutboundQueueProcessor; + protected SerializationContext serializationContext; + protected Class<?> deserializerClass; + protected Class<?> serializerClass; + protected TypeMarshaller typeMarshaller; + protected int port; + private SecurityConstraint securityConstraint; + protected String url; + protected boolean recordMessageSizes; + protected boolean recordMessageTimes; + protected boolean remote; + protected Server server; + protected boolean serverOnly; + + // Endpoint internal + protected String parsedUrl; + // Keeps track of what context path parsedUrl has been parsed for. If it is + // null, means parsedUrl has not been parsed already. + protected String parsedForContext; + protected boolean clientContextParsed; + protected String parsedClientUrl; + protected Logger log; + + protected Class<?> flexClientOutboundQueueProcessClass; + protected ConfigMap flexClientOutboundQueueProcessorConfig; + + // Supported messaging version + protected double messagingVersion = 1.0; + + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs an unmanaged <code>AbstractEndpoint</code>. + */ + public AbstractEndpoint() + { + this(false); + } + + /** + * Constructs an <code>AbstractEndpoint</code> with the indicated management. + * + * @param enableManagement <code>true</code> if the <code>AbstractEndpoint</code> + * is manageable; <code>false</code> otherwise. + */ + public AbstractEndpoint(boolean enableManagement) + { + super(enableManagement); + this.log = Log.getLogger(getLogCategory()); + serializationContext = new SerializationContext(); + } + + //-------------------------------------------------------------------------- + // + // Initialize, validate, start, and stop methods. + // + //-------------------------------------------------------------------------- + + /** + * Initializes the <code>Endpoint</code> with the properties. + * If subclasses override this method, they must call <code>super.initialize()</code>. + * + * @param id The ID of the <code>Endpoint</code>. + * @param properties Properties for the <code>Endpoint</code>. + */ + @Override + public void initialize(String id, ConfigMap properties) + { + super.initialize(id, properties); + + if (properties == null || properties.size() == 0) + return; + + // Client-targeted <client-load-balancing> + initializeClientLoadBalancing(id, properties); + + // Client-targeted <connect-timeout-seconds/> + connectTimeoutSeconds = properties.getPropertyAsInt(CONNECT_TIMEOUT_SECONDS_ELEMENT, 0); + + // Client-targeted <request-timeout-seconds/> + requestTimeoutSeconds = properties.getPropertyAsInt(REQUEST_TIMEOUT_SECONDS_ELEMENT, 0); + + // Check for a custom FlexClient outbound queue processor. + ConfigMap outboundQueueConfig = properties.getPropertyAsMap(FLEX_CLIENT_OUTBOUND_QUEUE_PROCESSOR, null); + if (outboundQueueConfig != null) + { + // Get nested props for the processor. + flexClientOutboundQueueProcessorConfig = outboundQueueConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null); + + String pClassName = outboundQueueConfig.getPropertyAsString(CLASS_ATTR, null); + if (pClassName != null) + { + try + { + flexClientOutboundQueueProcessClass = createClass(pClassName); + // And now create an instance and initialize to make sure the properties are valid. + setFlexClientOutboundQueueProcessorConfig(flexClientOutboundQueueProcessorConfig); + } + catch (Throwable t) + { + if (Log.isWarn()) + log.warn("Cannot register custom FlexClient outbound queue processor class {0}", new Object[]{pClassName}, t); + } + } + } + + ConfigMap serialization = properties.getPropertyAsMap(SERIALIZATION, null); + if (serialization != null) + { + // Custom deserializers + List<?> deserializers = serialization.getPropertyAsList(CUSTOM_DESERIALIZER, null); + if (deserializers != null && Log.isWarn()) + log.warn("Endpoint <custom-deserializer> functionality is no longer available. Please remove this entry from your configuration."); + + // Custom serializers + List<?> serializers = serialization.getPropertyAsList(CUSTOM_SERIALIZER, null); + if (serializers != null && Log.isWarn()) + log.warn("Endpoint <custom-serializer> functionality is no longer available. Please remove this entry from your configuration."); + + // Type Marshaller implementation + String typeMarshallerClassName = serialization.getPropertyAsString(TYPE_MARSHALLER, null); + if (typeMarshallerClassName != null && typeMarshallerClassName.length() > 0) + { + try + { + Class<?> tmc = createClass(typeMarshallerClassName); + typeMarshaller = (TypeMarshaller)ClassUtil.createDefaultInstance(tmc, TypeMarshaller.class); + } + catch (Throwable t) + { + if (Log.isWarn()) + log.warn("Cannot register custom type marshaller for type {0}", new Object[]{typeMarshallerClassName}, t); + } + } + + // Boolean Serialization Flags + serializationContext.createASObjectForMissingType = serialization.getPropertyAsBoolean(CREATE_ASOBJECT_FOR_MISSING_TYPE, false); + serializationContext.enableSmallMessages = serialization.getPropertyAsBoolean(ENABLE_SMALL_MESSAGES, true); + serializationContext.instantiateTypes = serialization.getPropertyAsBoolean(INSTANTIATE_TYPES, true); + serializationContext.supportRemoteClass = serialization.getPropertyAsBoolean(SUPPORT_REMOTE_CLASS, false); + serializationContext.legacyCollection = serialization.getPropertyAsBoolean(LEGACY_COLLECTION, false); + serializationContext.legacyDictionary = serialization.getPropertyAsBoolean(LEGACY_DICTIONARY, false); + serializationContext.legacyMap = serialization.getPropertyAsBoolean(LEGACY_MAP, false); + serializationContext.legacyXMLDocument = serialization.getPropertyAsBoolean(LEGACY_XML, false); + serializationContext.legacyXMLNamespaces = serialization.getPropertyAsBoolean(LEGACY_XML_NAMESPACES, false); + serializationContext.legacyThrowable = serialization.getPropertyAsBoolean(LEGACY_THROWABLE, false); + serializationContext.legacyBigNumbers = serialization.getPropertyAsBoolean(LEGACY_BIG_NUMBERS, false); + serializationContext.legacyExternalizable = serialization.getPropertyAsBoolean(LEGACY_EXTERNALIZABLE, false); + serializationContext.allowXmlDoctypeDeclaration = serialization.getPropertyAsBoolean(ALLOW_XML_DOCTYPE_DECLARATION, false); + serializationContext.allowXmlExternalEntityExpansion = serialization.getPropertyAsBoolean(ALLOW_XML_EXTERNAL_ENTITY_EXPANSION, false); + serializationContext.maxObjectNestLevel = (int)serialization.getPropertyAsLong(MAX_OBJECT_NEST_LEVEL, 512); + serializationContext.maxCollectionNestLevel = (int)serialization.getPropertyAsLong(MAX_COLLECTION_NEST_LEVEL, 15); + serializationContext.preferVectors = serialization.getPropertyAsBoolean(PREFER_VECTORS, false); + + boolean showStacktraces = serialization.getPropertyAsBoolean(SHOW_STACKTRACES, false); + if (showStacktraces && Log.isWarn()) + log.warn("The " + SHOW_STACKTRACES + " configuration option is deprecated and non-functional. Please remove this from your configuration file."); + serializationContext.restoreReferences = serialization.getPropertyAsBoolean(RESTORE_REFERENCES, false); + serializationContext.logPropertyErrors = serialization.getPropertyAsBoolean(LOG_PROPERTY_ERRORS, false); + serializationContext.ignorePropertyErrors = serialization.getPropertyAsBoolean(IGNORE_PROPERTY_ERRORS, true); + serializationContext.includeReadOnly = serialization.getPropertyAsBoolean(INCLUDE_READ_ONLY, false); + } + + recordMessageSizes = properties.getPropertyAsBoolean(ConfigurationConstants.RECORD_MESSAGE_SIZES_ELEMENT, false); + + if (recordMessageSizes && Log.isWarn()) + log.warn("Setting <record-message-sizes> to true affects application performance and should only be used for debugging"); + + recordMessageTimes = properties.getPropertyAsBoolean(ConfigurationConstants.RECORD_MESSAGE_TIMES_ELEMENT, false); + } + + /** + * Starts the endpoint if its associated <code>MessageBroker</code> is started, + * and if the endpoint is not already running. If subclasses override this method, + * they must call <code>super.start()</code>. + */ + @Override + public void start() + { + if (isStarted()) + return; + + // Check if the MessageBroker is started + MessageBroker broker = getMessageBroker(); + if (!broker.isStarted()) + { + if (Log.isWarn()) + { + Log.getLogger(getLogCategory()).warn("Endpoint with id '{0}' cannot be started" + + " when the MessageBroker is not started.", + new Object[]{getId()}); + } + return; + } + + // Set up management + if (isManaged() && broker.isManaged()) + { + setupEndpointControl(broker); + MessageBrokerControl controller = (MessageBrokerControl)broker.getControl(); + if (getControl() != null) + controller.addEndpoint(this); + } + + // Setup Deserializer and Serializer for the SerializationContext + if (deserializerClass == null) + deserializerClass = createClass(getDeserializerClassName()); + + if (serializerClass == null) + serializerClass = createClass(getSerializerClassName()); + + serializationContext.setDeserializerClass(deserializerClass); + serializationContext.setSerializerClass(serializerClass); + + // Setup endpoint features + ClassAliasRegistry registry = ClassAliasRegistry.getRegistry(); + registry.registerAlias(AsyncMessageExt.CLASS_ALIAS, AsyncMessageExt.class.getName()); + registry.registerAlias(AcknowledgeMessageExt.CLASS_ALIAS, AcknowledgeMessageExt.class.getName()); + registry.registerAlias(CommandMessageExt.CLASS_ALIAS, CommandMessageExt.class.getName()); + super.start(); + } + + /** + * Stops the endpoint if it is running. If subclasses override this method, they must + * call <code>super.stop()</code>. + */ + @Override + public void stop() + { + if (!isStarted()) + return; + + super.stop(); + + // Remove management + if (isManaged() && getMessageBroker().isManaged()) + { + if (getControl() != null) + { + getControl().unregister(); + setControl(null); + } + setManaged(false); + } + } + + //-------------------------------------------------------------------------- + // + // Public Getters and Setters for AbstractEndpoint properties + // + //-------------------------------------------------------------------------- + + /** + * Adds a client-load-balancing URL. + * + * @param url A client-load-balancing URL. + * @return <code>false</code> if the set already contains the URL, <code>true</code> otherwise. + * + */ + public boolean addClientLoadBalancingUrl(String url) + { + if (clientLoadBalancingUrls == null) + clientLoadBalancingUrls = new HashSet<String>(); + + if (url == null || url.length() == 0) + { + // Invalid {0} configuration for endpoint ''{1}''; cannot add empty url. + ConfigurationException ce = new ConfigurationException(); + ce.setMessage(ERR_MSG_EMTPY_CLIENT_LOAD_BALACNING_URL, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()}); + throw ce; + } + + return clientLoadBalancingUrls.add(url); + } + + /** + * Removes the client-load-balancing URL. + * + * @param url The URL to remove. + * @return <code>true</code> if the set contained the URL, <code>false</code> otherwise. + */ + public boolean removeClientLoadBalancingUrl(String url) + { + if (clientLoadBalancingUrls != null) + return clientLoadBalancingUrls.remove(url); + return false; + } + + /** + * Retrieves a snapshot of the current client-load-balancing URLs. + * + * @return A snapshot of the current client-load-balancing URLs, or <code>null</code> if + * no URL exists. + */ + public Set<String> getClientLoadBalancingUrls() + { + return clientLoadBalancingUrls == null? null : new HashSet<String>(clientLoadBalancingUrls); + } + + /** + * Retrieves the corresponding client channel type for the endpoint. + * + * @return The corresponding client channel type for the endpoint. + */ + public String getClientType() + { + return clientType; + } + + /** + * Sets the corresponding client channel type for the endpoint. + * + * @param type The corresponding client channel type for the endpoint. + */ + public void setClientType(String type) + { + this.clientType = type; + } + + /** + * Retrieves the <code>FlexClientOutboundQueueProcessorClass</code> of the endpoint. + * + * @return The <code>FlexClientOutboundQueueProcessorClass</code> of the endpoint. + */ + public Class<?> getFlexClientOutboundQueueProcessorClass() + { + return flexClientOutboundQueueProcessClass; + } + + /** + * Sets the the <code>FlexClientOutboundQueueProcessor</code> of the endpoint. + * + * @param flexClientOutboundQueueProcessorClass the Class of the Flex client outbound queue processor. + */ + public void setFlexClientOutboundQueueProcessorClass(Class<?> flexClientOutboundQueueProcessorClass) + { + this.flexClientOutboundQueueProcessClass = flexClientOutboundQueueProcessorClass; + if (flexClientOutboundQueueProcessClass != null && flexClientOutboundQueueProcessorConfig != null) + { + FlexClientOutboundQueueProcessor processor = (FlexClientOutboundQueueProcessor)ClassUtil.createDefaultInstance(flexClientOutboundQueueProcessClass, null); + processor.initialize(flexClientOutboundQueueProcessorConfig); + } + } + + /** + * Retrieves the properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint. + * + * @return The properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint. + */ + public ConfigMap getFlexClientOutboundQueueProcessorConfig() + { + return flexClientOutboundQueueProcessorConfig; + } + + /** + * Sets the properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint. + * + * @param flexClientOutboundQueueProcessorConfig The configuration map. + */ + public void setFlexClientOutboundQueueProcessorConfig(ConfigMap flexClientOutboundQueueProcessorConfig) + { + this.flexClientOutboundQueueProcessorConfig = flexClientOutboundQueueProcessorConfig; + if (flexClientOutboundQueueProcessorConfig != null && flexClientOutboundQueueProcessClass != null) + { + FlexClientOutboundQueueProcessor processor = (FlexClientOutboundQueueProcessor)ClassUtil.createDefaultInstance(flexClientOutboundQueueProcessClass, null); + processor.initialize(flexClientOutboundQueueProcessorConfig); + } + } + + /** + * Sets the ID of the <code>AbstractEndpoint</code>. If the <code>AbstractEndpoint</code> + * has a <code>MessageBroker</code> assigned, it also updates the ID in the + * <code>MessageBroker</code>. + * + * @param id The endpoint ID. + */ + @Override + public void setId(String id) + { + String oldId = getId(); + + if (oldId != null && oldId.equals(id)) + return; + + super.setId(id); + + // Update the endpoint id in the broker + MessageBroker broker = getMessageBroker(); + if (broker != null) + { + // broker must have the endpoint then + broker.removeEndpoint(oldId); + broker.addEndpoint(this); + } + } + + /** + * Retrieves the <code>MessageBroker</code> of the <code>AbstractEndpoint</code>. + * + * @return The <code>MessageBroker</code> of the <code>AbstractEndpoint</code>. + */ + public MessageBroker getMessageBroker() + { + return (MessageBroker)getParent(); + } + + /** + * Sets the <code>MessageBroker</code> of the <code>AbstractEndpoint</code>. + * Removes the <code>AbstractEndpoint</code> from the old broker + * (if there was one) and adds to the list of endpoints in the new broker. + * + * @param broker The <code>MessageBroker</code> of the <code>AbstractEndpoint</code>. + */ + public void setMessageBroker(MessageBroker broker) + { + MessageBroker oldBroker = getMessageBroker(); + + setParent(broker); + + if (oldBroker != null) + oldBroker.removeEndpoint(getId()); + + // Add endpoint to the new broker if needed + if (broker.getEndpoint(getId()) != this) + broker.addEndpoint(this); + } + + /** + * Return the highest messaging version currently available via this + * endpoint. + * @return double the messaging version + */ + public double getMessagingVersion() + { + return messagingVersion; + } + + /** + * Retrieves the port of the URL of the endpoint. + * A return value of 0 denotes no port in the channel URL. + * + * @return The port of the URL of the endpoint, or 0 if the URL does not contain + * a port number. + */ + public int getPort() + { + return port; + } + + /** + * Determines whether the endpoint is secure. + * + * @return <code>false</code> by default. + */ + public boolean isSecure() + { + return false; + } + + /** + * Determines if the endpoint clients connect to directly is mirrored and running + * on a remote host, in which case this local instance is not started and will service no direct + * client connections. + * + * @return <code>true</code> if this endpoint will not process direct client connections and is just + * a local representation of a symmetric endpoint on a remote host that will, <code>false</code> otherwise. + */ + public boolean isRemote() + { + return remote; + } + + /** + * Sets the remote status for this endpoint. + * + * @param value <code>true</code> if this endpoint will not process direct client connections and is just + * a local representation of a symmetric endpoint on a remote host that will, <code>false</code> otherwise. + */ + public void setRemote(boolean value) + { + remote = value; + } + + /** + * Retrieves the <tt>Server</tt> that the endpoint is using, or <code>null</code> if + * no server has been assigned. + * @return Server The Server object the endpoint is using. + */ + public Server getServer() + { + return server; + } + + /** + * Sets the <tt>Server</tt> that the endpoint will use. + * @param server The Server object. + */ + public void setServer(Server server) + { + this.server = server; + } + + /** + * Determines whether the endpoint is server only. + * + * @return <code>true</code> if the endpoint is server only, <code>false</code> otherwise. + */ + public boolean getServerOnly() + { + return serverOnly; + } + + /** + * Sets whether the endpoint is server only. + * + * @param serverOnly <code>true</code> if the endpoint is server only, <code>false</code> otherwise. + */ + public void setServerOnly(boolean serverOnly) + { + this.serverOnly = serverOnly; + } + + /** + * Retrieves the <code>SecurityConstraint</code> of the <code>Endpoint</code>. + * + * @return The <code>SecurityConstraint</code> of the <code>Endpoint</code>. + */ + public SecurityConstraint getSecurityConstraint() + { + return securityConstraint; + } + + /** + * Sets the <code>SecurityConstraint</code> of the <code>Endpoint</code>. + * + * @param securityConstraint The SecurityContraint object. + */ + public void setSecurityConstraint(SecurityConstraint securityConstraint) + { + this.securityConstraint = securityConstraint; + } + + /** + * Retrieves the <code>SerializationContext</code> of the endpoint. + * + * @return The <code>SerializationContext</code> of the endpoint. + */ + public SerializationContext getSerializationContext() + { + return serializationContext; + } + + /** + * Sets the <code>SerializationContext</code> of the endpoint. + * + * @param serializationContext The SerializationContext object. + */ + public void setSerializationContext(SerializationContext serializationContext) + { + this.serializationContext = serializationContext; + } + + /** + * Retrieves the <code>TypeMarshaller</code> of the endpoint. + * + * @return The <code>TypeMarshaller</code> of the endpoint. + */ + public TypeMarshaller getTypeMarshaller() + { + if (typeMarshaller == null) + typeMarshaller = new ASTranslator(); + + return typeMarshaller; + } + + /** + * Sets the <code>TypeMarshaller</code> of the endpoint. + * + * @param typeMarshaller The TypeMarshaller object. + */ + public void setTypeMarshaller(TypeMarshaller typeMarshaller) + { + this.typeMarshaller = typeMarshaller; + } + + /** + * Retrieves the URL of the endpoint. + * + * @return The URL of the endpoint. + */ + public String getUrl() + { + return url; + } + + /** + * Sets the URL of the endpoint. + * + * @param url The URL of the endpoint. + */ + public void setUrl(String url) + { + this.url = url; + port = internalParsePort(url); + parsedForContext = null; + clientContextParsed = false; + } + + /** + * + * Returns the url of the endpoint parsed for the client. + * + * @return The url of the endpoint parsed for the client. + */ + public String getUrlForClient() + { + if (!clientContextParsed) + { + HttpServletRequest req = FlexContext.getHttpRequest(); + if (req != null) + { + String contextPath = req.getContextPath(); + parseClientUrl(contextPath); + } + else + { + return url; + } + } + return parsedClientUrl; + } + + /** + * + * Returns the total throughput for the endpoint. + * + * @return The total throughput for the endpoint. + */ + public long getThroughput() + { + EndpointControl control = (EndpointControl)getControl(); + + return control.getBytesDeserialized().longValue() + control.getBytesSerialized().longValue(); + } + + //-------------------------------------------------------------------------- + // + // Other Public APIs + // + //-------------------------------------------------------------------------- + + + public static void addNoCacheHeaders(HttpServletRequest req, HttpServletResponse res) + { + String userAgent = req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME); + + // For MSIE over HTTPS, set additional Cache-Control values. + if (req.isSecure() && userAgent != null && userAgent.indexOf(UserAgentSettings.USER_AGENT_MSIE) != -1) + res.addHeader(HEADER_NAME_CACHE_CONTROL, "no-store, no-cache, must-revalidate, post-check=0, pre-check=0, no-transform, private"); + else // For the rest, set no-cache header value only. + res.addHeader(HEADER_NAME_CACHE_CONTROL, "no-cache"); + + // Set an expiration date in the past as well. + res.setDateHeader(HEADER_NAME_EXPIRES, 946080000000L); //Approx Jan 1, 2000 + + // Set Pragma no-cache header if we're not MSIE over HTTPS + if (!(req.isSecure() && userAgent != null && userAgent.indexOf(UserAgentSettings.USER_AGENT_MSIE) != -1)) + res.setHeader(HEADER_NAME_PRAGMA, "no-cache"); + } + + /** + * + */ + public Message convertToSmallMessage(Message message) + { + if (message instanceof SmallMessage) + { + Message smallMessage = ((SmallMessage)message).getSmallMessage(); + if (smallMessage != null) + message = smallMessage; + } + + return message; + } + + /** + * Retrieves a <code>ConfigMap</code> of the endpoint properties the client + * needs. Subclasses should add additional properties to <code>super.describeDestination</code>, + * or return <code>null</code> if they must not send their properties to the client. + * + * @return ConfigMap The ConfigMap object. + */ + public ConfigMap describeEndpoint() + { + ConfigMap channelConfig = new ConfigMap(); + + if (serverOnly) // Client does not need server only endpoints. + return channelConfig; + + channelConfig.addProperty(ID_ATTR, getId()); + channelConfig.addProperty(TYPE_ATTR, getClientType()); + + ConfigMap properties = new ConfigMap(); + + boolean containsClientLoadBalancing = clientLoadBalancingUrls != null && !clientLoadBalancingUrls.isEmpty(); + if (containsClientLoadBalancing) + { + ConfigMap clientLoadBalancing = new ConfigMap(); + for (Iterator<String> iterator = clientLoadBalancingUrls.iterator(); iterator.hasNext();) + { + ConfigMap url = new ConfigMap(); + // Adding as a value rather than attribute to the parent. + url.addProperty(EMPTY_STRING, iterator.next()); + clientLoadBalancing.addProperty(URL_ATTR, url); + } + properties.addProperty(CLIENT_LOAD_BALANCING_ELEMENT, clientLoadBalancing); + } + + // Add endpoint uri only if no client-load-balancing urls are defined. + if (!containsClientLoadBalancing) + { + ConfigMap endpointConfig = new ConfigMap(); + endpointConfig.addProperty(URI_ATTR, getUrlForClient()); + channelConfig.addProperty(ENDPOINT_ELEMENT, endpointConfig); + } + + if (connectTimeoutSeconds > 0) + { + ConfigMap connectTimeoutConfig = new ConfigMap(); + connectTimeoutConfig.addProperty(EMPTY_STRING, String.valueOf(connectTimeoutSeconds)); + properties.addProperty(CONNECT_TIMEOUT_SECONDS_ELEMENT, connectTimeoutConfig); + } + + if (requestTimeoutSeconds > 0) + { + ConfigMap requestTimeoutSeconds = new ConfigMap(); + requestTimeoutSeconds.addProperty(EMPTY_STRING, String.valueOf(requestTimeoutSeconds)); + properties.addProperty(REQUEST_TIMEOUT_SECONDS_ELEMENT, requestTimeoutSeconds); + } + + if (recordMessageTimes) + { + ConfigMap recordMessageTimesMap = new ConfigMap(); + // Adding as a value rather than attribute to the parent + recordMessageTimesMap.addProperty(EMPTY_STRING, TRUE_STRING); + properties.addProperty(RECORD_MESSAGE_TIMES_ELEMENT, recordMessageTimesMap); + } + + if (recordMessageSizes) + { + ConfigMap recordMessageSizesMap = new ConfigMap(); + // Adding as a value rather than attribute to the parent + recordMessageSizesMap.addProperty(EMPTY_STRING, TRUE_STRING); + properties.addProperty(RECORD_MESSAGE_SIZES_ELEMENT, recordMessageSizesMap); + } + + ConfigMap serialization = new ConfigMap(); + serialization.addProperty(ENABLE_SMALL_MESSAGES_ELEMENT, Boolean.toString(serializationContext.enableSmallMessages)); + properties.addProperty(SERIALIZATION_ELEMENT, serialization); + + if (properties.size() > 0) + channelConfig.addProperty(PROPERTIES_ELEMENT, properties); + + return channelConfig; + } + + /** + * + * Make sure this matches with ChannelSettings.getParsedUri. + */ + public String getParsedUrl(String contextPath) + { + parseUrl(contextPath); + return parsedUrl; + } + + /** + * + */ + public void handleClientMessagingVersion(Number version) + { + if (version != null) + { + boolean clientSupportsSmallMessages = version.doubleValue() >= messagingVersion; + if (clientSupportsSmallMessages && getSerializationContext().enableSmallMessages) + { + FlexSession session = FlexContext.getFlexSession(); + if (session != null) + session.setUseSmallMessages(true); + } + } + } + + /** + * Default implementation of the Endpoint <code>service</code> method. + * Subclasses should call <code>super.service</code> before their custom + * code. + * + * @param req The HttpServletRequest object. + * @param res The HttpServletResponse object. + */ + public void service(HttpServletRequest req, HttpServletResponse res) + { + validateRequestProtocol(req); + } + + /** + * Typically invoked by subclasses, this method transforms decoded message data + * into the appropriate Message object and routes the Message to the endpoint's broker. + * + * @param message The decoded message data. + * @return Message The transformed message. + */ + public Message serviceMessage(Message message) + { + if (isManaged()) + { + ((EndpointControl) getControl()).incrementServiceMessageCount(); + } + + try + { + FlexContext.setThreadLocalEndpoint(this); + Message ack = null; + + // Make sure this message is timestamped. + if (message.getTimestamp() == 0) + { + message.setTimestamp(System.currentTimeMillis()); + } + + // Reset the endpoint header for inbound messages to the id for this endpoint + // to guarantee that it's correct. Don't allow clients to spoof this. + // However, if the endpoint id is passed as null we need to tag the message to + // skip channel/endpoint validation at the destination level (MessageBroker.inspectChannel()). + if (message.getHeader(Message.ENDPOINT_HEADER) != null) + message.setHeader(Message.VALIDATE_ENDPOINT_HEADER, Boolean.TRUE); + message.setHeader(Message.ENDPOINT_HEADER, getId()); + + if (message instanceof CommandMessage) + { + CommandMessage command = (CommandMessage)message; + + // Apply channel endpoint level constraint; always allow login commands through. + int operation = command.getOperation(); + if (operation != CommandMessage.LOGIN_OPERATION) + checkSecurityConstraint(message); + + // Handle general (not Consumer specific) poll requests here. + // We need to fetch all outbound messages for client subscriptions over this endpoint. + // We identify these general poll messages by their operation and a null clientId. + if (operation == CommandMessage.POLL_OPERATION && message.getClientId() == null) + { + verifyFlexClientSupport(command); + + + FlexClient flexClient = FlexContext.getFlexClient(); + ack = handleFlexClientPollCommand(flexClient, command); + } + else if (operation == CommandMessage.DISCONNECT_OPERATION) + { + ack = handleChannelDisconnect(command); + } + else if (operation == CommandMessage.TRIGGER_CONNECT_OPERATION) + { + ack = new AcknowledgeMessage(); + ((AcknowledgeMessage)ack).setCorrelationId(message.getMessageId()); + + boolean needsConfig = false; + if (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER) != null) + needsConfig = ((Boolean)(command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER))); + + // Send configuration information only if the client requested. + if (needsConfig) + { + ConfigMap serverConfig = getMessageBroker().describeServices(this); + if (serverConfig.size() > 0) + ack.setBody(serverConfig); + } + } + else + { + // Block a subset of commands for legacy clients that need to be recompiled to + // interop with a 2.5+ server. + if (operation == CommandMessage.SUBSCRIBE_OPERATION || operation == CommandMessage.POLL_OPERATION) + verifyFlexClientSupport(command); + + ack = getMessageBroker().routeCommandToService((CommandMessage) message, this); + + // Look for client advertised features on initial connect. + if (operation == CommandMessage.CLIENT_PING_OPERATION || operation == CommandMessage.LOGIN_OPERATION) + { + Number clientVersion = (Number)command.getHeader(CommandMessage.MESSAGING_VERSION); + handleClientMessagingVersion(clientVersion); + + // Also respond by advertising the messaging version on the + // acknowledgement. + ack.setHeader(CommandMessage.MESSAGING_VERSION, new Double(messagingVersion)); + } + } + } + else + { + // Block any AsyncMessages from a legacy client. + if (message instanceof AsyncMessage) + verifyFlexClientSupport(message); + + // Apply channel endpoint level constraint. + checkSecurityConstraint(message); + + ack = getMessageBroker().routeMessageToService(message, this); + } + + return ack; + } + finally + { + FlexContext.setThreadLocalEndpoint(null); + } + } + + /** + * Utility method that endpoint implementations (or associated classes) + * should invoke when they receive an incoming message from a client but before + * servicing it. This method looks up or creates the proper FlexClient instance + * based upon the client the message came from and places it in the FlexContext. + * + * @param message The incoming message to process. + * + * @return The FlexClient, or <code>null</code> if the message did not contain a FlexClient ID value. + */ + public FlexClient setupFlexClient(Message message) + { + FlexClient flexClient = null; + if (message.getHeaders().containsKey(Message.FLEX_CLIENT_ID_HEADER)) + { + String id = (String)message.getHeaders().get(Message.FLEX_CLIENT_ID_HEADER); + // If the id is null, reset to the special token value that let's us differentiate + // between legacy clients and 2.5+ clients. + if (id == null) + id = FlexClient.NULL_FLEXCLIENT_ID; + flexClient = setupFlexClient(id); + } + return flexClient; + } + + /** + * Utility method that endpoint implementations (or associated classes) + * should invoke when they receive an incoming message from a client but before + * servicing it. This method looks up or creates the proper FlexClient instance + * based upon the FlexClient ID value received from the client. + * It also associates this FlexClient instance with the current FlexSession. + * + * @param id The FlexClient ID value from the client. + * + * @return The FlexClient or null if the provided ID was <code>null</code>. + */ + public FlexClient setupFlexClient(String id) + { + FlexClient flexClient = null; + if (id != null) + { + // This indicates that we're dealing with a non-legacy client that hasn't been + // assigned a FlexClient Id yet. Reset to null to generate a fresh Id. + if (id.equals(FlexClient.NULL_FLEXCLIENT_ID)) + id = null; + + flexClient = getMessageBroker().getFlexClientManager().getFlexClient(id); + // Make sure the FlexClient and FlexSession are associated. + FlexSession session = FlexContext.getFlexSession(); + flexClient.registerFlexSession(session); + // And place the FlexClient in FlexContext for this request. + FlexContext.setThreadLocalFlexClient(flexClient); + } + return flexClient; + } + + /** + * + * Performance metrics gathering property + */ + public boolean isRecordMessageSizes() + { + return recordMessageSizes; + } + + /** + * + * Performance metrics gathering property + */ + public boolean isRecordMessageTimes() + { + return recordMessageTimes; + } + + /** + * + */ + public void setThreadLocals() + { + if (serializationContext != null) + { + SerializationContext context = (SerializationContext)serializationContext.clone(); + // Get the latest deserialization validator from the broker. + MessageBroker broker = getMessageBroker(); + DeserializationValidator validator = broker == null? null : broker.getDeserializationValidator(); + context.setDeserializationValidator(validator); + SerializationContext.setSerializationContext(context); + } + + TypeMarshallingContext.setTypeMarshaller(getTypeMarshaller()); + } + + /** + * + */ + public void clearThreadLocals() + { + SerializationContext.clearThreadLocalObjects(); + TypeMarshallingContext.clearThreadLocalObjects(); + } + + //-------------------------------------------------------------------------- + // + // Protected/private methods. + // + //-------------------------------------------------------------------------- + + /** + * Returns the log category of the <code>AbstractEndpoint</code>. Subclasses + * can override to provide a more specific logging category. + * + * @return The log category. + */ + @Override + protected String getLogCategory() + { + return LOG_CATEGORY; + } + + /** + * Hook method invoked when a disconnect command is received from a client channel. + * The response returned by this method is not guaranteed to get to the client, which + * is free to terminate its physical connection at any point. + * + * @param disconnectCommand The disconnect command. + * @return The response; by default an empty <tt>AcknowledgeMessage</tt>. + */ + protected Message handleChannelDisconnect(CommandMessage disconnectCommand) + { + return new AcknowledgeMessage(); + } + + /** + * Hook method for varying poll reply strategies for synchronous endpoints. + * The default behavior performs a non-waited, synchronous poll for the FlexClient + * and if any messages are currently queued they are returned immediately. If no + * messages are queued an empty response is returned immediately. + * + * @param flexClient The FlexClient that issued the poll request. + * @param pollCommand The poll command from the client. + * @return The FlushResult response. + */ + protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand) + { + return flexClient.poll(getId()); + } + + /** + * Handles a general poll request from a FlexClient to this endpoint. + * Subclasses may override to implement different poll handling strategies. + * + * @param flexClient The FlexClient that issued the poll request. + * @param pollCommand The poll command from the client. + * @return The poll response message; either for success or fault. + */ + protected Message handleFlexClientPollCommand(FlexClient flexClient, CommandMessage pollCommand) + { + if (Log.isDebug()) + Log.getLogger(getMessageBroker().getLogCategory(pollCommand)).debug( + "Before handling general client poll request. " + StringUtils.NEWLINE + + " incomingMessage: " + pollCommand + StringUtils.NEWLINE); + + FlushResult flushResult = handleFlexClientPoll(flexClient, pollCommand); + Message pollResponse = null; + + // Generate a no-op poll response if necessary; prevents a single client from busy polling when the server + // is doing wait()-based long-polls. + if ((flushResult instanceof PollFlushResult) && ((PollFlushResult)flushResult).isClientProcessingSuppressed()) + { + pollResponse = new CommandMessage(CommandMessage.CLIENT_SYNC_OPERATION); + pollResponse.setHeader(CommandMessage.NO_OP_POLL_HEADER, Boolean.TRUE); + } + + if (pollResponse == null) + { + List<Message> messagesToReturn = (flushResult != null) ? flushResult.getMessages() : null; + if (messagesToReturn != null && !messagesToReturn.isEmpty()) + { + pollResponse = new CommandMessage(CommandMessage.CLIENT_SYNC_OPERATION); + pollResponse.setBody(messagesToReturn.toArray()); + } + else + { + pollResponse = new AcknowledgeMessage(); + } + } + + // Set the adaptive poll wait time if necessary. + if (flushResult != null) + { + int nextFlushWaitTime = flushResult.getNextFlushWaitTimeMillis(); + if (nextFlushWaitTime > 0) + pollResponse.setHeader(CommandMessage.POLL_WAIT_HEADER, new Integer(nextFlushWaitTime)); + } + + if (Log.isDebug()) + { + String debugPollResult = Log.getPrettyPrinter().prettify(pollResponse); + Log.getLogger(getMessageBroker().getLogCategory(pollCommand)).debug( + "After handling general client poll request. " + StringUtils.NEWLINE + + " reply: " + debugPollResult + StringUtils.NEWLINE); + } + + return pollResponse; + } + + /** + * Initializes the <code>Endpoint</code> with the client-load-balancing urls. + * + * @param id Id of the <code>Endpoint</code>. + * @param properties Properties for the <code>Endpoint</code>. + */ + + protected void initializeClientLoadBalancing(String id, ConfigMap properties) + { + if (!properties.containsKey(CLIENT_LOAD_BALANCING_ELEMENT)) + return; + + ConfigMap clientLoadBalancing = properties.getPropertyAsMap(CLIENT_LOAD_BALANCING_ELEMENT, null); + if (clientLoadBalancing == null) + { + // Invalid {0} configuration for endpoint ''{1}''; no urls defined. + ConfigurationException ce = new ConfigurationException(); + ce.setMessage(ERR_MSG_EMPTY_CLIENT_LOAD_BALANCING_ELEMENT, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()}); + throw ce; + } + + @SuppressWarnings("unchecked") + List<String> urls = clientLoadBalancing.getPropertyAsList(URL_ATTR, null); + if (urls == null || urls.isEmpty()) + { + // Invalid {0} configuration for endpoint ''{1}''; no urls defined. + ConfigurationException ce = new ConfigurationException(); + ce.setMessage(ERR_MSG_EMPTY_CLIENT_LOAD_BALANCING_ELEMENT, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()}); + throw ce; + } + + for (Iterator<String> iterator = urls.iterator(); iterator.hasNext();) + { + String url = iterator.next(); + if (!addClientLoadBalancingUrl(url) && Log.isWarn()) + log.warn("Endpoint '{0}' is ignoring the url '{1}' as it's already in the set of client-load-balancing urls.", new Object[]{id, url}); + } + } + + protected void checkSecurityConstraint(Message message) + { + if (securityConstraint != null) + { + getMessageBroker().getLoginManager().checkConstraint(securityConstraint); + } + } + + /** + * Returns the deserializer class name used by the endpoint. + * + * @return The deserializer class name used by the endpoint. + */ + protected abstract String getDeserializerClassName(); + + /** + * Returns the serializer class name used by the endpoint. + * + * @return The serializer class name used by the endpoint. + */ + protected abstract String getSerializerClassName(); + + /** + * Returns the secure protocol scheme for the endpoint. + * + * @return The secure protocol scheme for the endpoint. + */ + protected abstract String getSecureProtocolScheme(); + + /** + * Returns the insecure protocol scheme for the endpoint. + * + * @return The insecure protocol scheme for the endpoint. + */ + protected abstract String getInsecureProtocolScheme(); + + /** + * Invoked automatically to allow the <code>AbstractEndpoint</code> to setup + * its corresponding MBean control. Subclasses should override to setup and + * register their MBean control. Manageable subclasses should override this + * template method. + * + * @param broker The <code>MessageBroker</code> that manages this + * <code>AbstractEndpoint</code>. + */ + protected abstract void setupEndpointControl(MessageBroker broker); + + /** + * Validates the endpoint url scheme. + */ + protected void validateEndpointProtocol() + { + String scheme = isSecure()? getSecureProtocolScheme() : getInsecureProtocolScheme(); + if (!url.startsWith(scheme)) + { + ConfigurationException ce = new ConfigurationException(); + ce.setMessage(ERR_MSG_INVALID_URL_SCHEME, new Object[] {url, scheme}); + throw ce; + } + } + + protected void validateRequestProtocol(HttpServletRequest req) + { + // Secure url can talk to secure or non-secure endpoint. + // Non-secure url can only talk to non-secure endpoint. + boolean secure = req.isSecure(); + if (!secure && isSecure()) + { + // Secure endpoints must be contacted via a secure protocol. + String endpointPath = req.getServletPath() + req.getPathInfo(); + SecurityException se = new SecurityException(); + se.setMessage(NONSECURE_PROTOCOL, new Object[]{endpointPath}); + throw se; + } + } + + /** + * + * Verifies that the remote client supports the FlexClient API. + * Legacy clients that do not support this receive a message fault for any messages they send. + * + * @param message The message to verify. + */ + protected void verifyFlexClientSupport(Message message) + { + if (FlexContext.getFlexClient() == null) + { + MessageException me = new MessageException(); + me.setMessage(REQUIRES_FLEXCLIENT_SUPPORT, new Object[] {message.getDestination()}); + throw me; + } + } + + /** + * + */ + protected Class<?> createClass(String className) + { + return ClassUtil.createClass(className, FlexContext.getMessageBroker() == null ? null : + FlexContext.getMessageBroker().getClassLoader()); + } + + // This should match with ChannelSetting.parseClientUri + private void parseClientUrl(String contextPath) + { + if (!clientContextParsed) + { + String channelEndpoint = url.trim(); + + // either {context-root} or {context.root} is legal + channelEndpoint = StringUtils.substitute(channelEndpoint, "{context-root}", ConfigurationConstants.CONTEXT_PATH_TOKEN); + + if ((contextPath == null) && (channelEndpoint.indexOf(ConfigurationConstants.CONTEXT_PATH_TOKEN) != -1)) + { + // context root must be specified before it is used + ConfigurationException e = new ConfigurationException(); + e.setMessage(ConfigurationConstants.UNDEFINED_CONTEXT_ROOT, new Object[]{getId()}); + throw e; + } + + // simplify the number of combinations to test by ensuring our + // context path always starts with a slash + if (contextPath != null && !contextPath.startsWith("/")) + { + contextPath = "/" + contextPath; + } + + // avoid double-slashes from context root by replacing /{context.root} + // in a single replacement step + if (channelEndpoint.indexOf(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN) != -1) + { + // but avoid double-slash for /{context.root}/etc when we have + // the default context root + if ("/".equals(contextPath) && !ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN.equals(channelEndpoint)) + contextPath = ""; + + channelEndpoint = StringUtils.substitute(channelEndpoint, ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN, contextPath); + } + // otherwise we have something like {server.name}:{server.port}{context.root}... + else + { + // but avoid double-slash for {context.root}/etc when we have + // the default context root + if ("/".equals(contextPath) && !ConfigurationConstants.CONTEXT_PATH_TOKEN.equals(channelEndpoint)) + contextPath = ""; + + channelEndpoint = StringUtils.substitute(channelEndpoint, ConfigurationConstants.CONTEXT_PATH_TOKEN, contextPath); + } + + parsedClientUrl = channelEndpoint; + clientContextParsed = true; + } + } + + private int internalParsePort(String url) + { + int port = ChannelSettings.parsePort(url); + // If there is no specified port, log an info message as urls without ports are supported + if (port == 0 && Log.isInfo()) + log.info("No port specified in channel URL: {0}", new Object[]{url}); + + return port == -1? 0 : port; // Replace -1 with 0. + } + + private void parseUrl(String contextPath) + { + // Parse again only if never parsed before or parsed for a different contextPath. + if (parsedForContext == null || !parsedForContext.equals(contextPath)) + { + String channelEndpoint = url.toLowerCase().trim(); + + // Remove protocol and host info + String insecureProtocol = getInsecureProtocolScheme() + "://"; + String secureProtocol = getSecureProtocolScheme() + "://"; + if (channelEndpoint.startsWith(secureProtocol) || channelEndpoint.startsWith(insecureProtocol)) + { + int nextSlash = channelEndpoint.indexOf('/', 8); + if (nextSlash > 0) + { + channelEndpoint = channelEndpoint.substring(nextSlash); + } + } + + // either {context-root} or {context.root} is legal + channelEndpoint = StringUtils.substitute(channelEndpoint, "{context-root}", ConfigurationConstants.CONTEXT_PATH_TOKEN); + + // Remove context path info + if (channelEndpoint.startsWith(ConfigurationConstants.CONTEXT_PATH_TOKEN)) + { + channelEndpoint = channelEndpoint.substring(ConfigurationConstants.CONTEXT_PATH_TOKEN.length()); + } + else if (channelEndpoint.startsWith(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN)) + { + channelEndpoint = channelEndpoint.substring(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN.length()); + } + else if (contextPath.length() > 0) + { + if (channelEndpoint.startsWith(contextPath.toLowerCase())) + { + channelEndpoint = channelEndpoint.substring(contextPath.length()); + } + } + + // We also don't match on trailing slashes + if (channelEndpoint.endsWith("/")) + { + channelEndpoint = channelEndpoint.substring(0, channelEndpoint.length() - 1); + } + + parsedUrl = channelEndpoint; + parsedForContext = contextPath; + } + } +}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java b/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java new file mode 100644 index 0000000..bb31b39 --- /dev/null +++ b/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.endpoints; + +import flex.management.runtime.messaging.endpoints.EndpointControl; +import flex.messaging.FlexContext; +import flex.messaging.FlexSession; +import flex.messaging.HttpFlexSession; +import flex.messaging.MessageClient; +import flex.messaging.client.FlexClient; +import flex.messaging.config.ConfigMap; +import flex.messaging.config.ConfigurationConstants; +import flex.messaging.endpoints.amf.AMFFilter; +import flex.messaging.io.MessageIOConstants; +import flex.messaging.io.amf.ActionContext; +import flex.messaging.log.HTTPRequestLog; +import flex.messaging.messages.CommandMessage; +import flex.messaging.messages.Message; +import flex.messaging.util.SettingsReplaceUtil; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract base class for all the HTTP-based endpoints. + */ +public abstract class BaseHTTPEndpoint extends AbstractEndpoint +{ + //-------------------------------------------------------------------------- + // + // Public Static Constants + // + //-------------------------------------------------------------------------- + + /** + * The secure and insecure URL schemes for the HTTP endpoint. + */ + public static final String HTTP_PROTOCOL_SCHEME = "http"; + public static final String HTTPS_PROTOCOL_SCHEME = "https"; + + //-------------------------------------------------------------------------- + // + // Private Static Constants + // + //-------------------------------------------------------------------------- + + private static final String ADD_NO_CACHE_HEADERS = "add-no-cache-headers"; + private static final String REDIRECT_URL = "redirect-url"; + private static final String INVALIDATE_SESSION_ON_DISCONNECT = "invalidate-session-on-disconnect"; + private static final String HTTP_RESPONSE_HEADERS = "http-response-headers"; + private static final String HEADER_ATTR = "header"; + + private static final String HEADER_NAME_ORIGIN = "Origin"; + private static final String ACCESS_CONTROL = "Access-Control-"; + private static final String SESSION_REWRITING_ENABLED = "session-rewriting-enabled"; + + private static final int ERR_MSG_DUPLICATE_SESSIONS_DETECTED = 10035; + private static final String REQUEST_ATTR_DUPLICATE_SESSION_FLAG = "flex.messaging.request.DuplicateSessionDetected"; + + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs an unmanaged <code>BaseHTTPEndpoint</code>. + */ + public BaseHTTPEndpoint() + { + this(false); + } + + /** + * Constructs a <code>BaseHTTPEndpoint</code> with the specified management setting. + * + * @param enableManagement <code>true</code> if the <code>BaseHTTPEndpoint</code> + * is manageable; otherwise <code>false</code>. + */ + public BaseHTTPEndpoint(boolean enableManagement) + { + super(enableManagement); + } + + //-------------------------------------------------------------------------- + // + // Initialize, validate, start, and stop methods. + // + //-------------------------------------------------------------------------- + + /** + * Initializes the <code>Endpoint</code> with the properties. + * If subclasses override this method, they must call <code>super.initialize()</code>. + * + * @param id The ID of the <code>Endpoint</code>. + * @param properties Properties for the <code>Endpoint</code>. + */ + @Override public void initialize(String id, ConfigMap properties) + { + super.initialize(id, properties); + + if (properties == null || properties.size() == 0) + return; + + // General HTTP props. + addNoCacheHeaders = properties.getPropertyAsBoolean(ADD_NO_CACHE_HEADERS, true); + redirectURL = properties.getPropertyAsString(REDIRECT_URL, null); + invalidateSessionOnDisconnect = properties.getPropertyAsBoolean(INVALIDATE_SESSION_ON_DISCONNECT, false); + loginAfterDisconnect = properties.getPropertyAsBoolean(ConfigurationConstants.LOGIN_AFTER_DISCONNECT_ELEMENT, false); + sessionRewritingEnabled = properties.getPropertyAsBoolean(SESSION_REWRITING_ENABLED, true); + initializeHttpResponseHeaders(properties); + validateEndpointProtocol(); + } + + /** + * Starts the <code>Endpoint</code> by creating a filter chain and setting + * up serializers and deserializers. + */ + @Override public void start() + { + if (isStarted()) + return; + + super.start(); + + filterChain = createFilterChain(); + } + + //-------------------------------------------------------------------------- + // + // Variables + // + //-------------------------------------------------------------------------- + + /** + * Controller used to manage this endpoint. + */ + protected EndpointControl controller; + + /** + * AMF processing filter chain used by this endpoint. + */ + protected AMFFilter filterChain; + + /** + * Headers to add to the HTTP response. + */ + protected List<HttpHeader> httpResponseHeaders; + + //-------------------------------------------------------------------------- + // + // Properties + // + //-------------------------------------------------------------------------- + + //---------------------------------- + // addNoCacheHeaders + //---------------------------------- + + protected boolean addNoCacheHeaders = true; + + /** + * Retrieves the <code>add-no-cache-headers</code> property. + * + * @return <code>true</code> if <code>add-no-cache-headers</code> is enabled; + * <code>false</code> otherwise. + */ + public boolean isAddNoCacheHeaders() + { + return addNoCacheHeaders; + } + + /** + * Sets the <code>add-no-cache-headers</code> property. + * + * @param addNoCacheHeaders The <code>add-no-cache-headers</code> property. + */ + public void setAddNoCacheHeaders(boolean addNoCacheHeaders) + { + this.addNoCacheHeaders = addNoCacheHeaders; + } + + //---------------------------------- + // loginAfterDisconnect + //---------------------------------- + + /** + * + * This is a property used on the client. + */ + protected boolean loginAfterDisconnect; + + //---------------------------------- + // invalidateSessionOnDisconnect + //---------------------------------- + + protected boolean invalidateSessionOnDisconnect; + + /** + * Indicates whether the server session will be invalidated + * when a client channel disconnects. + * The default is <code>false</code>. + * + * @return <code>true</code> if the server session will be invalidated + * when a client channel disconnects, <code>false</code> otherwise. + */ + public boolean isInvalidateSessionOnDisconnect() + { + return invalidateSessionOnDisconnect; + } + + /** + * Determines whether to invalidate the server session for a client + * that disconnects its channel. + * The default is <code>false</code>. + * + * @param value <code>true</code> to invalidate the server session for a client + * that disconnects its channel, <code>false</code> otherwise. + */ + public void setInvalidateSessionOnDisconnect(boolean value) + { + invalidateSessionOnDisconnect = value; + } + + //---------------------------------- + // redirectURL + //---------------------------------- + + protected String redirectURL; + + /** + * Retrieves the <code>redirect-url</code> property. + * + * @return The <code>redirect-url</code> property. + */ + public String getRedirectURL() + { + return redirectURL; + } + + /** + * Sets the <code>redirect-url</code> property. + * + * @param redirectURL The <code>redirect-url</code> property. + */ + public void setRedirectURL(String redirectURL) + { + this.redirectURL = redirectURL; + } + + //---------------------------------- + // sessionRewritingEnabled + //---------------------------------- + + protected boolean sessionRewritingEnabled = true; + + /** + * Indicates whether the server will fall back on rewriting URLs to include + * session identifiers in the URL when HTTP session cookies are not allowed + * on the client. The default is <code>true</code>. + * + * @return <code>true</code> if the session rewriting is enabled. + */ + public boolean isSessionRewritingEnabled() + { + return sessionRewritingEnabled; + } + + /** + * Sets whether the session rewriting is enabled. + * + * @param value The session writing enabled value. + */ + public void setSessionRewritingEnabled(boolean value) + { + sessionRewritingEnabled = value; + } + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Handle AMF/AMFX encoded messages sent over HTTP. + * + * @param req The original servlet request. + * @param res The active servlet response. + */ + @Override + public void service(HttpServletRequest req, HttpServletResponse res) + { + super.service(req, res); + + try + { + // Setup serialization and type marshalling contexts + setThreadLocals(); + + // Create a context for this request + ActionContext context = new ActionContext(); + + // Pass endpoint's mpi settings to the context so that it knows what level of + // performance metrics should be gathered during serialization/deserialization + context.setRecordMessageSizes(isRecordMessageSizes()); + context.setRecordMessageTimes(isRecordMessageTimes()); + + // Send invocation through filter chain, which ends at the MessageBroker + filterChain.invoke(context); + + // After serialization completes, increment endpoint byte counters, + // if the endpoint is managed + if (isManaged()) + { + controller.addToBytesDeserialized(context.getDeserializedBytes()); + controller.addToBytesSerialized(context.getSerializedBytes()); + } + + if (context.getStatus() != MessageIOConstants.STATUS_NOTAMF) + { + if (addNoCacheHeaders) + addNoCacheHeaders(req, res); + + addHeadersToResponse(req, res); + + ByteArrayOutputStream outBuffer = context.getResponseOutput(); + + res.setContentType(getResponseContentType()); + + res.setContentLength(outBuffer.size()); + outBuffer.writeTo(res.getOutputStream()); + res.flushBuffer(); + } + else + { + // Not an AMF request, probably viewed in a browser + if (redirectURL != null) + { + try + { + //Check for redirect URL context-root token + redirectURL = SettingsReplaceUtil.replaceContextPath(redirectURL, req.getContextPath()); + res.sendRedirect(redirectURL); + } + catch (IllegalStateException alreadyFlushed) + { + // ignore + } + } + } + } + catch (IOException ioe) + { + // This happens when client closes the connection, log it at info level + log.info(ioe.getMessage()); + // Store exception information for latter logging + req.setAttribute(HTTPRequestLog.HTTP_ERROR_INFO, ioe.toString()); + } + catch (Throwable t) + { + log.error(t.getMessage(), t); + // Store exception information for latter logging + req.setAttribute(HTTPRequestLog.HTTP_ERROR_INFO, t.toString()); + } + finally + { + clearThreadLocals(); + } + } + + + /** + * + * Returns a <code>ConfigMap</code> of endpoint properties that the client + * needs. This includes properties from <code>super.describeEndpoint</code> + * and additional <code>BaseHTTPEndpoint</code> specific properties under + * "properties" key. + */ + @Override + public ConfigMap describeEndpoint() + { + ConfigMap endpointConfig = super.describeEndpoint(); + + if (loginAfterDisconnect) + { + ConfigMap loginAfterDisconnect = new ConfigMap(); + // Adding as a value rather than attribute to the parent + loginAfterDisconnect.addProperty(EMPTY_STRING, TRUE_STRING); + + ConfigMap properties = endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null); + if (properties == null) + { + properties = new ConfigMap(); + endpointConfig.addProperty(PROPERTIES_ELEMENT, properties); + } + properties.addProperty(ConfigurationConstants.LOGIN_AFTER_DISCONNECT_ELEMENT, loginAfterDisconnect); + } + + return endpointConfig; + } + + /** + * Overrides to guard against duplicate HTTP-based sessions for the same FlexClient + * which will occur if the remote host has disabled session cookies. + * + * @see AbstractEndpoint#setupFlexClient(String) + */ + @Override + public FlexClient setupFlexClient(String id) + { + FlexClient flexClient = super.setupFlexClient(id); + + // Scan for duplicate HTTP-sessions and if found, invalidate them and throw a MessageException. + // A request attribute is used to deal with batched AMF messages that arrive in a single request by trigger multiple passes through this method. + boolean duplicateSessionDetected = (FlexContext.getHttpRequest().getAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG) != null); + + List<FlexSession> sessions = null; + if (!duplicateSessionDetected) + { + sessions = flexClient.getFlexSessions(); + int n = sessions.size(); + if (n > 1) + { + List<HttpFlexSession> httpFlexSessions = new ArrayList<HttpFlexSession>(); + for (int i = 0; i < n; i++) + { + FlexSession currentSession = sessions.get(i); + if (currentSession instanceof HttpFlexSession) + httpFlexSessions.add((HttpFlexSession)currentSession); + if (httpFlexSessions.size() > 1) + { + FlexContext.getHttpRequest().setAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG, httpFlexSessions); + duplicateSessionDetected = true; + break; + } + } + } + } + + // If more than one was found, remote host isn't using session cookies. Kill all duplicate sessions and return an error. + // Simplest to just re-scan the list given that it will be very short, but use an iterator for concurrent modification. + if (duplicateSessionDetected) + { + Object attributeValue = FlexContext.getHttpRequest().getAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG); + String newSessionId = null; + String oldSessionId = null; + if (attributeValue != null) + { + @SuppressWarnings("unchecked") + List<HttpFlexSession> httpFlexSessions = (List<HttpFlexSession>)attributeValue; + oldSessionId = httpFlexSessions.get(0).getId(); + newSessionId = httpFlexSessions.get(1).getId(); + } + + if (sessions != null) + { + for (FlexSession session : sessions) + { + if (session instanceof HttpFlexSession) + { + session.invalidate(); + } + } + } + + // Return an error to the client. + + DuplicateSessionException e = new DuplicateSessionException(); + // Duplicate HTTP-based FlexSession error: A request for FlexClient ''{0}'' arrived over a new FlexSession ''{1}'', but FlexClient is already associated with FlexSession ''{2}'', therefore it cannot be associated with the new session. + e.setMessage(ERR_MSG_DUPLICATE_SESSIONS_DETECTED, new Object[]{flexClient.getId(), newSessionId, oldSessionId}); + throw e; + } + + return flexClient; + } + + //-------------------------------------------------------------------------- + // + // Protected Methods + // + //-------------------------------------------------------------------------- + + /** + * Adds custom headers specified in the config to the HTTP response. The only + * exception is that access control headers (Access-Control-*) are sent only + * if there is an Origin header in the request. + * + * @param request The HTTP request. + * @param response The HTTP response. + */ + protected void addHeadersToResponse(HttpServletRequest request, HttpServletResponse response) + { + if (httpResponseHeaders == null || httpResponseHeaders.isEmpty()) + return; + + String origin = request.getHeader(HEADER_NAME_ORIGIN); + boolean originHeaderExists = origin != null && origin.length() != 0; + + for (HttpHeader header : httpResponseHeaders) + { + if (header.name.startsWith(ACCESS_CONTROL) && !originHeaderExists) + continue; + + response.addHeader(header.name, header.value); + } + } + + /** + * Create the gateway filters that transform action requests + * and responses. + */ + protected abstract AMFFilter createFilterChain(); + + /** + * Returns the content type used by the connection handler to set on the + * HTTP response. Subclasses should either return MessageIOConstants.AMF_CONTENT_TYPE + * or MessageIOConstants.XML_CONTENT_TYPE. + */ + protected abstract String getResponseContentType(); + + /** + * Returns https which is the secure protocol scheme for the endpoint. + * + * @return https. + */ + @Override protected String getSecureProtocolScheme() + { + return HTTPS_PROTOCOL_SCHEME; + } + + /** + * Returns http which is the insecure protocol scheme for the endpoint. + * + * @return http. + */ + @Override protected String getInsecureProtocolScheme() + { + return HTTP_PROTOCOL_SCHEME; + } + + /** + * @see flex.messaging.endpoints.AbstractEndpoint#handleChannelDisconnect(CommandMessage) + */ + @Override protected Message handleChannelDisconnect(CommandMessage disconnectCommand) + { + HttpFlexSession session = (HttpFlexSession)FlexContext.getFlexSession(); + FlexClient flexClient = FlexContext.getFlexClient(); + + // Shut down any subscriptions established over this channel/endpoint + // for this specific FlexClient. + if (flexClient.isValid()) + { + String endpointId = getId(); + List<MessageClient> messageClients = flexClient.getMessageClients(); + for (MessageClient messageClient : messageClients) + { + if (messageClient.getEndpointId().equals(endpointId)) + { + messageClient.setClientChannelDisconnected(true); + messageClient.invalidate(); + } + } + } + + // And optionally invalidate the session. + if (session.isValid() && isInvalidateSessionOnDisconnect()) + session.invalidate(false /* don't recreate */); + + return super.handleChannelDisconnect(disconnectCommand); + } + + protected void initializeHttpResponseHeaders(ConfigMap properties) + { + if (!properties.containsKey(HTTP_RESPONSE_HEADERS)) + return; + + ConfigMap httpResponseHeaders = properties.getPropertyAsMap(HTTP_RESPONSE_HEADERS, null); + if (httpResponseHeaders == null) + return; + + @SuppressWarnings("unchecked") + List<String> headers = httpResponseHeaders.getPropertyAsList(HEADER_ATTR, null); + if (headers == null || headers.isEmpty()) + return; + + if (this.httpResponseHeaders == null) + this.httpResponseHeaders = new ArrayList<HttpHeader>(); + + for (String header : headers) + { + int colonIndex = header.indexOf(":"); + String name = header.substring(0, colonIndex).trim(); + String value = header.substring(colonIndex + 1).trim(); + this.httpResponseHeaders.add(new HttpHeader(name, value)); + } + } + + //-------------------------------------------------------------------------- + // + // Nested Classes + // + //-------------------------------------------------------------------------- + + /** + * Helper class used for headers in the HTTP request/response. + */ + static class HttpHeader + { + public HttpHeader(String name, String value) + { + this.name = name; + this.value = value; + } + public final String name; + public final String value; + } +} \ No newline at end of file
