http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/Endpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/Endpoint.java b/core/src/flex/messaging/endpoints/Endpoint.java deleted file mode 100644 index 3e1f7de..0000000 --- a/core/src/flex/messaging/endpoints/Endpoint.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import flex.management.Manageable; -import flex.messaging.MessageBroker; -import flex.messaging.config.ConfigMap; -import flex.messaging.config.SecurityConstraint; - -/** - * An endpoint receives messages from clients and decodes them, - * then sends them on to a MessageBroker for routing to a service. - * The endpoint also encodes messages and delivers them to clients. - * Endpoints are specific to a message format and network transport, - * and are defined by the named URI path on which they are located. - */ -public interface Endpoint extends Manageable -{ - /** - * Initializes the endpoint with an ID and properties. - * - * @param id The ID of the endpoint. - * @param properties Properties of the endpoint. - */ - void initialize(String id, ConfigMap properties); - - /** - * Start the endpoint. The MethodBroker invokes this - * method in order to set the endpoint up for sending and receiving - * messages from Flash clients. - * - */ - void start(); - - /** - * Determines whether the endpoint is started. - * - * @return <code>true</code> if the endpoint is started; <code>false</code> otherwise. - */ - boolean isStarted(); - - /** - * Stop and destroy the endpoint. The MethodBroker invokes - * this method in order to stop the endpoint from sending - * and receiving messages from Flash clients. - * - */ - void stop(); - - /** - * Retrieves the corresponding client channel type for the endpoint. - * - * @return The corresponding client channel type for the endpoint. - */ - String getClientType(); - - /** - * Sets the corresponding client channel type for the endpoint. - * - * @param clientType The corresponding client channel type for the endpoint. - */ - void setClientType(String clientType); - - /** - * Retrieves the endpoint properties the client needs. - * @return The endpoint properties the client needs. - */ - ConfigMap describeEndpoint(); - - /** - * All endpoints are referenceable by an ID that is unique among - * all the endpoints registered to a single broker instance. - * @return The endpoint ID. - */ - String getId(); - - /** - * All endpoints are referenceable by an ID that is unique among - * all the endpoints registered to a single broker instance. The id - * is set through this method, usually through parsed configuration. - * - * @param id The endpoint ID. - */ - void setId(String id); - - /** - * All endpoints must be managed by a single MessageBroker, - * and must be capable of returning a reference to that broker. - * This broker reference is used when the endpoint wishes to - * send a message to one of the broker's services. - * - * @return broker The MessageBroker instance which manages this endpoint. - */ - MessageBroker getMessageBroker(); - - /** - * Sets the <code>MessageBroker</code> of the endpoint. - * - * @param broker the message broker object - */ - void setMessageBroker(MessageBroker broker); - - /** - * Retrieves the highest messaging version currently available via this - * endpoint. - * @return The messaging version number. - */ - double getMessagingVersion(); - - - String getParsedUrl(String contextPath); - - /** - * Retrieves the port of the URL of the endpoint. - * - * @return The port of the URL of the endpoint. - */ - int getPort(); - - /** - * Specifies whether this protocol requires the secure HTTPS protocol. - * @return <code>true</code> if the endpoint is a secure endpoint, <code>false</code> otherwise. - */ - boolean isSecure(); - - /** - * Retrieves the security constraint of the endpoint. - * - * @return The security constraint of the endpoint. - */ - SecurityConstraint getSecurityConstraint(); - - /** - * Sets the security constraint of the endpoint. - * - * @param constraint The security constraint of the endpoint. - */ - void setSecurityConstraint(SecurityConstraint constraint); - - /** - * Responds to HTTP-based messages published by a client. Endpoints which - * do not support access over HTTP should throw an UnsupportedOperationException - * in the implementation of htis method. - * - * @param req The HttpServletRequest object. - * @param res The HttpServletResponse object. - */ - void service(HttpServletRequest req, HttpServletResponse res); - - /** - * Retrieves the URL of the endpoint. - * - * @return The URL of the endpoint. - */ - String getUrl(); - - /** - * Sets the URL of the endpoint. - * - * @param url The URL of the endpoint. - */ - void setUrl(String url); - - /** - * - * Returns the url of the endpoint parsed for the client. - * - * @return The url of the endpoint parsed for the client. - */ - String getUrlForClient(); -}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/Endpoint2.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/Endpoint2.java b/core/src/flex/messaging/endpoints/Endpoint2.java deleted file mode 100644 index 8f10ebe..0000000 --- a/core/src/flex/messaging/endpoints/Endpoint2.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -import flex.messaging.Server; - -/** - * Extension interface for <tt>Endpoint</tt> that adds support for a - * referenced <tt>Server</tt> that the endpoint may use. - */ -public interface Endpoint2 extends Endpoint -{ - /** - * Retrieves the <tt>Server</tt> that the endpoint is using; <code>null</code> if - * no server has been assigned. - */ - Server getServer(); - - /** - * Sets the <tt>Server</tt> that the endpoint will use. - */ - void setServer(Server server); -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/HTTPEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/HTTPEndpoint.java b/core/src/flex/messaging/endpoints/HTTPEndpoint.java deleted file mode 100644 index 82e358f..0000000 --- a/core/src/flex/messaging/endpoints/HTTPEndpoint.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import flex.management.runtime.messaging.endpoints.HTTPEndpointControl; -import flex.messaging.MessageBroker; -import flex.messaging.endpoints.amf.AMFFilter; -import flex.messaging.endpoints.amf.BatchProcessFilter; -import flex.messaging.endpoints.amf.MessageBrokerFilter; -import flex.messaging.endpoints.amf.SerializationFilter; -import flex.messaging.endpoints.amf.SessionFilter; -import flex.messaging.io.MessageIOConstants; -import flex.messaging.log.LogCategories; -import flex.messaging.messages.Message; -import flex.messaging.security.SecurityException; - -/** - * This class replaces Flex 1.5's ProxyServlet by splitting - * the proxy's functionality into two pieces. Requests for proxied HTTP - * content can now be sent using a message type via any channel. - * The message broker directs requests to the appropriate service, - * in Flex 1.5 terms, the Proxy Service. The response from the proxy - * request is streamed back to the client. - * - */ -public class HTTPEndpoint extends BasePollingHTTPEndpoint -{ - public static final String LOG_CATEGORY = LogCategories.ENDPOINT_HTTP; - - private static final int IMPROPER_CONTENT_TYPE = 10068; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>HTTPEndpoint</code>. - */ - public HTTPEndpoint() - { - this(false); - } - - /** - * Constructs a <code>HTTPEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>HTTPEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public HTTPEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - /** - * Currently this override is a no-op to disable small messages over HTTP - * endpoints. - */ - @Override public Message convertToSmallMessage(Message message) - { - return message; - } - - /** - * Overrides to check the request content type is application/xml. - * - * @param req The servlet request. - * @param res The servlet response. - */ - @Override public void service(HttpServletRequest req, HttpServletResponse res) - { - String contentType = req.getContentType(); - boolean xmlContentType = contentType == null || contentType.equals(MessageIOConstants.XML_CONTENT_TYPE); - if (!xmlContentType) - { - // HTTP endpoint ''{0}'' must be contacted via a HTTP request with proper content type. - SecurityException se = new SecurityException(); - se.setMessage(IMPROPER_CONTENT_TYPE, new Object[]{id}); - throw se; - } - - super.service(req, res); - } - - //-------------------------------------------------------------------------- - // - // Protected/Private Methods - // - //-------------------------------------------------------------------------- - - /** - * Create default filter chain or return current one if already present. - */ - @Override protected AMFFilter createFilterChain() - { - AMFFilter serializationFilter = new SerializationFilter(getLogCategory()); - AMFFilter batchFilter = new BatchProcessFilter(); - AMFFilter sessionFilter = sessionRewritingEnabled? new SessionFilter() : null; - AMFFilter messageBrokerFilter = new MessageBrokerFilter(this); - - serializationFilter.setNext(batchFilter); - if (sessionFilter != null) - { - batchFilter.setNext(sessionFilter); - sessionFilter.setNext(messageBrokerFilter); - } - else - { - batchFilter.setNext(messageBrokerFilter); - } - - return serializationFilter; - } - - /** - * Returns MessageIOConstants.XML_CONTENT_TYPE. - */ - @Override protected String getResponseContentType() - { - return MessageIOConstants.XML_CONTENT_TYPE; - } - - /** - * Returns the log category of the endpoint. - * - * @return The log category of the endpoint. - */ - @Override protected String getLogCategory() - { - return LOG_CATEGORY; - } - - /** - * Returns the deserializer class name used by the endpoint. - * - * @return The deserializer class name used by the endpoint. - */ - @Override protected String getDeserializerClassName() - { - return "flex.messaging.io.amfx.AmfxMessageDeserializer"; - } - - /** - * Returns the serializer class name used by the endpoint. - * - * @return The serializer class name used by the endpoint. - */ - @Override protected String getSerializerClassName() - { - return "flex.messaging.io.amfx.AmfxMessageSerializer"; - } - - /** - * Invoked automatically to allow the <code>HTTPEndpoint</code> to setup its - * corresponding MBean control. - * - * @param broker The <code>MessageBroker</code> that manages this - * <code>HTTPEndpoint</code>. - */ - @Override protected void setupEndpointControl(MessageBroker broker) - { - controller = new HTTPEndpointControl(this, broker.getControl()); - controller.register(); - setControl(controller); - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/SecureAMFEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/SecureAMFEndpoint.java b/core/src/flex/messaging/endpoints/SecureAMFEndpoint.java deleted file mode 100644 index 7b3a44f..0000000 --- a/core/src/flex/messaging/endpoints/SecureAMFEndpoint.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -/** - * Secure version of AMFEndpoint. - */ -public class SecureAMFEndpoint extends AMFEndpoint -{ - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>SecureAMFEndpoint</code>. - */ - public SecureAMFEndpoint() - { - this(false); - } - - /** - * Constructs a <code>SecureAMFEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>SecureAMFEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public SecureAMFEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Determines whether the endpoint is secure. - * - * @return <code>true</code> if the endpoint is secure, <code>false</code> otherwise. - */ - public boolean isSecure() - { - return true; - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/SecureHTTPEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/SecureHTTPEndpoint.java b/core/src/flex/messaging/endpoints/SecureHTTPEndpoint.java deleted file mode 100644 index 1f08a63..0000000 --- a/core/src/flex/messaging/endpoints/SecureHTTPEndpoint.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -/** - * Secure version of HTTPEndpoint. - */ -public class SecureHTTPEndpoint extends HTTPEndpoint -{ - //-------------------------------------------------------------------------- - // - // Constructors - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>SecureHTTPEndpoint</code>. - */ - public SecureHTTPEndpoint() - { - this(false); - } - - /** - * Constructs a <code>SecureHTTPEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>SecureHTTPEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public SecureHTTPEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Determines whether the endpoint is secure. - * - * @return <code>true</code> if the endpoint is secure, <code>false</code> otherwise. - */ - public boolean isSecure() - { - return true; - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/SecureStreamingAMFEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/SecureStreamingAMFEndpoint.java b/core/src/flex/messaging/endpoints/SecureStreamingAMFEndpoint.java deleted file mode 100644 index dd38d97..0000000 --- a/core/src/flex/messaging/endpoints/SecureStreamingAMFEndpoint.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -/** - * Secure version of StreamingAMFEndpoint. - */ -public class SecureStreamingAMFEndpoint extends StreamingAMFEndpoint -{ - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>SecureStreamingAMFEndpoint</code>. - */ - public SecureStreamingAMFEndpoint() - { - this(false); - } - - /** - * Constructs a <code>SecureStreamingAMFEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>SecureStreamingAMFEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public SecureStreamingAMFEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Determines whether the endpoint is secure. - * - * @return <code>true</code> if the endpoint is secure, <code>false</code> otherwise. - */ - public boolean isSecure() - { - return true; - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/SecureStreamingHTTPEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/SecureStreamingHTTPEndpoint.java b/core/src/flex/messaging/endpoints/SecureStreamingHTTPEndpoint.java deleted file mode 100644 index d053a4d..0000000 --- a/core/src/flex/messaging/endpoints/SecureStreamingHTTPEndpoint.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -/** - * Secure version of StreamingHTTPEndpoint. - */ -public class SecureStreamingHTTPEndpoint extends StreamingHTTPEndpoint -{ - //-------------------------------------------------------------------------- - // - // Constructors - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>SecureStreamingHTTPEndpoint</code>. - */ - public SecureStreamingHTTPEndpoint() - { - this(false); - } - - /** - * Constructs a <code>SecureStreamingHTTPEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>SecureHTTPEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public SecureStreamingHTTPEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Determines whether the endpoint is secure. - * - * @return <code>true</code> if the endpoint is secure, <code>false</code> otherwise. - */ - public boolean isSecure() - { - return true; - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/StreamingAMFEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/StreamingAMFEndpoint.java b/core/src/flex/messaging/endpoints/StreamingAMFEndpoint.java deleted file mode 100644 index 7c5e02a..0000000 --- a/core/src/flex/messaging/endpoints/StreamingAMFEndpoint.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -import javax.servlet.ServletOutputStream; -import javax.servlet.http.HttpServletResponse; - -import flex.management.runtime.messaging.endpoints.StreamingAMFEndpointControl; -import flex.messaging.MessageBroker; -import flex.messaging.endpoints.amf.AMFFilter; -import flex.messaging.endpoints.amf.BatchProcessFilter; -import flex.messaging.endpoints.amf.LegacyFilter; -import flex.messaging.endpoints.amf.MessageBrokerFilter; -import flex.messaging.endpoints.amf.SerializationFilter; -import flex.messaging.endpoints.amf.SessionFilter; -import flex.messaging.io.MessageIOConstants; -import flex.messaging.io.TypeMarshallingContext; -import flex.messaging.io.amf.Amf3Output; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.messages.Message; - -/** - * Extension to the AMFEndpoint to support streaming HTTP connections to connected - * clients. - * Each streaming connection managed by this endpoint consumes one of the request - * handler threads provided by the servlet container, so it is not highly scalable - * but offers performance advantages over client polling for clients receiving a steady, - * rapid stream of pushed messages. - * This endpoint does not support polling clients and will fault any poll requests - * that are received. To support polling clients use AMFEndpoint instead. - */ -public class StreamingAMFEndpoint extends BaseStreamingHTTPEndpoint -{ - //-------------------------------------------------------------------------- - // - // Public Static Constants - // - //-------------------------------------------------------------------------- - - /** - * The log category for this endpoint. - */ - public static final String LOG_CATEGORY = LogCategories.ENDPOINT_STREAMING_AMF; - - //-------------------------------------------------------------------------- - // - // Private Static Constants - // - //-------------------------------------------------------------------------- - - //-------------------------------------------------------------------------- - // - // Constructors - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>StreamingAMFEndpoint</code>. - */ - public StreamingAMFEndpoint() - { - this(false); - } - - /** - * Constructs a <code>StreamingAMFEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>StreamingAMFEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public StreamingAMFEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Protected Methods - // - //-------------------------------------------------------------------------- - - /** - * Create the gateway filters that transform action requests - * and responses. - */ - @Override protected AMFFilter createFilterChain() - { - AMFFilter serializationFilter = new SerializationFilter(getLogCategory()); - AMFFilter batchFilter = new BatchProcessFilter(); - AMFFilter sessionFilter = sessionRewritingEnabled? new SessionFilter() : null; - AMFFilter envelopeFilter = new LegacyFilter(this); - AMFFilter messageBrokerFilter = new MessageBrokerFilter(this); - - serializationFilter.setNext(batchFilter); - if (sessionFilter != null) - { - batchFilter.setNext(sessionFilter); - sessionFilter.setNext(envelopeFilter); - } - else - { - batchFilter.setNext(envelopeFilter); - } - envelopeFilter.setNext(messageBrokerFilter); - - return serializationFilter; - } - - /** - * Returns MessageIOConstants.AMF_CONTENT_TYPE. - */ - @Override protected String getResponseContentType() - { - return MessageIOConstants.AMF_CONTENT_TYPE; - } - - /** - * Returns the log category of the endpoint. - * - * @return The log category of the endpoint. - */ - @Override protected String getLogCategory() - { - return LOG_CATEGORY; - } - - /** - * Used internally for performance information gathering; not intended for - * public use. Serializes the message in AMF format and returns the size of - * the serialized message. - * - * @param message Message to get the size for. - * - * @return The size of the message after message is serialized. - */ - @Override protected long getMessageSizeForPerformanceInfo(Message message) - { - Amf3Output amfOut = new Amf3Output(serializationContext); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - DataOutputStream dataOutStream = new DataOutputStream(outStream); - amfOut.setOutputStream(dataOutStream); - try - { - amfOut.writeObject(message); - } - catch (IOException e) - { - if (Log.isDebug()) - log.debug("MPI exception while retrieving the size of the serialized message: " + e.toString()); - } - return dataOutStream.size(); - } - - /** - * Returns the deserializer class name used by the endpoint. - * - * @return The deserializer class name used by the endpoint. - */ - @Override protected String getDeserializerClassName() - { - return "flex.messaging.io.amf.AmfMessageDeserializer"; - } - - /** - * Returns the serializer class name used by the endpoint. - * - * @return The serializer class name used by the endpoint. - */ - @Override protected String getSerializerClassName() - { - return "flex.messaging.io.amf.AmfMessageSerializer"; - } - - /** - * Invoked automatically to allow the <code>StreamingAMFEndpoint</code> to setup its - * corresponding MBean control. - * - * @param broker The <code>MessageBroker</code> that manages this - * <code>StreamingAMFEndpoint</code>. - */ - @Override protected void setupEndpointControl(MessageBroker broker) - { - controller = new StreamingAMFEndpointControl(this, broker.getControl()); - controller.register(); - setControl(controller); - } - - /** - * Helper method invoked by the endpoint request handler thread cycling in wait-notify. - * Serializes messages and streams each to the client as a response chunk using streamChunk(). - * - * @param messages The messages to serialize and push to the client. - * @param os The output stream the chunk will be written to. - * @param response The HttpServletResponse, used to flush the chunk to the client. - */ - @Override protected void streamMessages(List messages, ServletOutputStream os, HttpServletResponse response) throws IOException - { - if (messages == null || messages.isEmpty()) - return; - - // Serialize each message as a separate chunk of bytes. - TypeMarshallingContext.setTypeMarshaller(getTypeMarshaller()); - for (Iterator iter = messages.iterator(); iter.hasNext();) - { - Message message = (Message)iter.next(); - addPerformanceInfo(message); - message = convertPushMessageToSmall(message); - if (Log.isDebug()) - log.debug("Endpoint with id '" + getId() + "' is streaming message: " + message); - - Amf3Output amfOut = new Amf3Output(serializationContext); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - DataOutputStream dataOutStream = new DataOutputStream(outStream); - amfOut.setOutputStream(dataOutStream); - - amfOut.writeObject(message); - dataOutStream.flush(); - byte[] messageBytes = outStream.toByteArray(); - streamChunk(messageBytes, os, response); - - if (isManaged()) - ((StreamingAMFEndpointControl)controller).incrementPushCount(); - } - TypeMarshallingContext.setTypeMarshaller(null); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java b/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java deleted file mode 100644 index a5e1c66..0000000 --- a/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -import javax.servlet.ServletOutputStream; -import javax.servlet.http.HttpServletResponse; - -import flex.management.runtime.messaging.endpoints.StreamingHTTPEndpointControl; -import flex.messaging.MessageBroker; -import flex.messaging.endpoints.amf.AMFFilter; -import flex.messaging.endpoints.amf.BatchProcessFilter; -import flex.messaging.endpoints.amf.MessageBrokerFilter; -import flex.messaging.endpoints.amf.SerializationFilter; -import flex.messaging.endpoints.amf.SessionFilter; -import flex.messaging.io.MessageIOConstants; -import flex.messaging.io.TypeMarshallingContext; -import flex.messaging.io.amfx.AmfxOutput; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.messages.Message; - -/** - * Extension to the HTTPEndpoint to support streaming HTTP connections to connected - * clients. - * Each streaming connection managed by this endpoint consumes one of the request - * handler threads provided by the servlet container, so it is not highly scalable - * but offers performance advantages over client polling for clients receiving a steady, - * rapid stream of pushed messages. - * This endpoint does not support polling clients and will fault any poll requests - * that are received. To support polling clients use HTTPEndpoint instead. - */ -public class StreamingHTTPEndpoint extends BaseStreamingHTTPEndpoint -{ - //-------------------------------------------------------------------------- - // - // Public Constants - // - //-------------------------------------------------------------------------- - - /** - * The log category for this endpoint. - */ - public static final String LOG_CATEGORY = LogCategories.ENDPOINT_STREAMING_HTTP; - - //-------------------------------------------------------------------------- - // - // Constructors - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>StreamingHTTPEndpoint</code>. - */ - public StreamingHTTPEndpoint() - { - this(false); - } - - /** - * Constructs a <code>StreamingHTTPEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>StreamingHTTPEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public StreamingHTTPEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Protected Methods - // - //-------------------------------------------------------------------------- - - /** - * Create default filter chain or return current one if already present. - */ - @Override protected AMFFilter createFilterChain() - { - AMFFilter serializationFilter = new SerializationFilter(getLogCategory()); - AMFFilter batchFilter = new BatchProcessFilter(); - AMFFilter sessionFilter = sessionRewritingEnabled? new SessionFilter() : null; - AMFFilter messageBrokerFilter = new MessageBrokerFilter(this); - - serializationFilter.setNext(batchFilter); - if (sessionFilter != null) - { - batchFilter.setNext(sessionFilter); - sessionFilter.setNext(messageBrokerFilter); - } - else - { - batchFilter.setNext(messageBrokerFilter); - } - - return serializationFilter; - } - - /** - * Returns MessageIOConstants.XML_CONTENT_TYPE. - */ - @Override protected String getResponseContentType() - { - return MessageIOConstants.XML_CONTENT_TYPE; - } - - /** - * Returns the log category of the endpoint. - * - * @return The log category of the endpoint. - */ - @Override protected String getLogCategory() - { - return LOG_CATEGORY; - } - - /** - * Used internally for performance information gathering; not intended for - * public use. Serializes the message in AMFX format and returns the size - * of the serialized message. - * - * @param message Message to get the size for. - * - * @return The size of the message after message is serialized. - */ - @Override protected long getMessageSizeForPerformanceInfo(Message message) - { - AmfxOutput amfxOut = new AmfxOutput(serializationContext); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - DataOutputStream dataOutStream = new DataOutputStream(outStream); - amfxOut.setOutputStream(dataOutStream); - try - { - amfxOut.writeObject(message); - } - catch (IOException e) - { - if (Log.isDebug()) - log.debug("MPI exception while retrieving the size of the serialized message: " + e.toString()); - } - return dataOutStream.size(); - } - - /** - * Returns the deserializer class name used by the endpoint. - * - * @return The deserializer class name used by the endpoint. - */ - @Override protected String getDeserializerClassName() - { - return "flex.messaging.io.amfx.AmfxMessageDeserializer"; - } - - /** - * Returns the serializer class name used by the endpoint. - * - * @return The serializer class name used by the endpoint. - */ - @Override protected String getSerializerClassName() - { - return "flex.messaging.io.amfx.AmfxMessageSerializer"; - } - - /** - * Invoked automatically to allow the <code>StreamingHTTPEndpoint</code> to setup its - * corresponding MBean control. - * - * @param broker The <code>MessageBroker</code> that manages this - * <code>StreamingHTTPEndpoint</code>. - */ - @Override protected void setupEndpointControl(MessageBroker broker) - { - controller = new StreamingHTTPEndpointControl(this, broker.getControl()); - controller.register(); - setControl(controller); - } - - /** - * Helper method invoked by the endpoint request handler thread cycling in wait-notify. - * Serializes messages and streams each to the client as a response chunk using streamChunk(). - * - * @param messages The messages to serialize and push to the client. - * @param os The output stream the chunk will be written to. - * @param response The HttpServletResponse, used to flush the chunk to the client. - */ - @Override protected void streamMessages(List messages, ServletOutputStream os, HttpServletResponse response) throws IOException - { - if (messages == null || messages.isEmpty()) - return; - - // Serialize each message as a separate chunk of bytes. - TypeMarshallingContext.setTypeMarshaller(getTypeMarshaller()); - for (Iterator iter = messages.iterator(); iter.hasNext();) - { - Message message = (Message)iter.next(); - addPerformanceInfo(message); - - message = convertPushMessageToSmall(message); - - if (Log.isDebug()) - log.debug("Endpoint with id '" + getId() + "' is streaming message: " + message); - - AmfxOutput amfxOut = new AmfxOutput(serializationContext); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - DataOutputStream dataOutStream = new DataOutputStream(outStream); - amfxOut.setOutputStream(dataOutStream); - - amfxOut.writeObject(message); - dataOutStream.flush(); - byte[] messageBytes = outStream.toByteArray(); - streamChunk(messageBytes, os, response); - - if (isManaged()) - ((StreamingHTTPEndpointControl)controller).incrementPushCount(); - } - TypeMarshallingContext.setTypeMarshaller(null); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/amf/AMFFilter.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/amf/AMFFilter.java b/core/src/flex/messaging/endpoints/amf/AMFFilter.java deleted file mode 100644 index 5abaea2..0000000 --- a/core/src/flex/messaging/endpoints/amf/AMFFilter.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints.amf; - -import flex.messaging.io.amf.ActionContext; - -import java.io.IOException; - -/** - * Filters perform pre- and post-processing duties on the ActionContext, - * which contains the message/invocation as well as conextual information - * about it, following the standard pipe-and-filter design pattern. - */ -public abstract class AMFFilter -{ - protected AMFFilter next; - - public AMFFilter() - { - } - - public void setNext(AMFFilter next) - { - this.next = next; - } - - public AMFFilter getNext() - { - return next; - } - - /** - * The core business method. - */ - public abstract void invoke(final ActionContext context) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java b/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java deleted file mode 100644 index 8376979..0000000 --- a/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints.amf; - -import flex.messaging.io.amf.ActionContext; -import flex.messaging.io.amf.MessageBody; -import flex.messaging.io.MessageIOConstants; -import flex.messaging.io.RecoverableSerializationException; - -/** - * Filter that breaks down the batched message buffer into individual invocations. - */ -public class BatchProcessFilter extends AMFFilter -{ - public BatchProcessFilter() - { - } - - public void invoke(final ActionContext context) - { - // Process each action in the body - int bodyCount = context.getRequestMessage().getBodyCount(); - - // Report batch size in Debug mode - //gateway.getLogger().logDebug("Processing batch of " + bodyCount + " request(s)"); - - for (context.setMessageNumber(0); context.getMessageNumber() < bodyCount; context.incrementMessageNumber()) - { - try - { - // create the response body - MessageBody responseBody = new MessageBody(); - responseBody.setTargetURI(context.getRequestMessageBody().getResponseURI()); - - // append the response body to the output message - context.getResponseMessage().addBody(responseBody); - - //Check that deserialized message body data type was valid. If not, skip this message. - Object o = context.getRequestMessageBody().getData(); - - if (o != null && o instanceof RecoverableSerializationException) - { - context.getResponseMessageBody().setData(((RecoverableSerializationException)o).createErrorMessage()); - context.getResponseMessageBody().setReplyMethod(MessageIOConstants.STATUS_METHOD); - continue; - } - - // invoke next filter in the chain - next.invoke(context); - } - catch (Exception e) - { - // continue invoking on next message body despite error - } - } - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/amf/LegacyFilter.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/amf/LegacyFilter.java b/core/src/flex/messaging/endpoints/amf/LegacyFilter.java deleted file mode 100644 index fc52755..0000000 --- a/core/src/flex/messaging/endpoints/amf/LegacyFilter.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints.amf; - -import flex.messaging.endpoints.BaseHTTPEndpoint; -import flex.messaging.io.amf.ASObject; -import flex.messaging.io.amf.ActionContext; -import flex.messaging.io.amf.MessageBody; -import flex.messaging.io.amf.MessageHeader; -import flex.messaging.messages.Message; -import flex.messaging.messages.RemotingMessage; -import flex.messaging.messages.ErrorMessage; - -import java.io.IOException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.lang.reflect.Array; - -/** - * AMF Headers are of limited use because the apply to the entire AMF packet, which - * may contain a batch of several requests. - * <p> - * Rather than relying on the Flash Player team to change the AMF specification, - * Flex 1.5 introduced the concept of a Message Envelope that allowed them to provide - * message level headers that apply to a single request body. - * </p> - * <p> - * Essentially they introduced one more layer of indirection with an ASObject of type "Envelope" - * that had two properties:<br /> - * - <i>headers</i>, which was an array of Header structures<br /> - * - <i>body</i>, which was the actual data of the request (typically an array of arguments) - * </p> - * <p> - * To save space on the wire, a Header structure was simply an array. The first element was - * the header name as a String, and was the only required field. The second element, a boolean, - * indicated whether the header must be understood. The third element, any Object, represented - * the header value, if required. - * </p> - */ -public class LegacyFilter extends AMFFilter -{ - public static final String LEGACY_ENVELOPE_FLAG_KEY = "_flag"; - public static final String LEGACY_ENVELOPE_FLAG_VALUE = "Envelope"; - public static final String LEGACY_SECURITY_HEADER_NAME = "Credentials"; - public static final String LEGACY_SECURITY_PRINCIPAL = "userid"; - public static final String LEGACY_SECURITY_CREDENTIALS = "password"; - - private BaseHTTPEndpoint endpoint; - - public LegacyFilter(BaseHTTPEndpoint endpoint) - { - this.endpoint = endpoint; - } - - public void invoke(final ActionContext context) throws IOException - { - MessageBody requestBody = context.getRequestMessageBody(); - context.setLegacy(true); - - // Parameters are usually sent as an AMF Array - Object data = requestBody.getData(); - List newParams = null; - - // Check whether we're a new Flex 2.0 Messaging request - if (data != null) - { - if (data.getClass().isArray()) - { - int paramLength = Array.getLength(data); - if (paramLength == 1) - { - Object obj = Array.get(data, 0); - if (obj != null && obj instanceof Message) - { - context.setLegacy(false); - newParams = new ArrayList(); - newParams.add(obj); - } - } - - // It was not a Flex 2.0 Message, but we have an array, use its contents as our params - if (newParams == null) - { - newParams = new ArrayList(); - for (int i = 0; i < paramLength; i++) - { - try - { - newParams.add(Array.get(data, i)); - } - catch (Throwable t) - { - } - } - } - } - else if (data instanceof List) - { - List paramList = (List)data; - if (paramList.size() == 1) - { - Object obj = paramList.get(0); - if (obj != null && obj instanceof Message) - { - context.setLegacy(false); - newParams = new ArrayList(); - newParams.add(obj); - } - } - - // It was not a Flex 2.0 Message, but we have a list, so use it as our params - if (newParams == null) - { - newParams = (List)data; - } - } - } - - // We still haven't found any lists of params, so create one with - // whatever data we have. - if (newParams == null) - { - newParams = new ArrayList(); - newParams.add(data); - - } - - if (context.isLegacy()) - { - newParams = legacyRequest(context, newParams); - } - - requestBody.setData(newParams); - - - next.invoke(context); - - - if (context.isLegacy()) - { - MessageBody responseBody = context.getResponseMessageBody(); - Object response = responseBody.getData(); - - if (response instanceof ErrorMessage) - { - ErrorMessage error = (ErrorMessage)response; - ASObject aso = new ASObject(); - aso.put("message", error.faultString); - aso.put("code", error.faultCode); - aso.put("details", error.faultDetail); - aso.put("rootCause", error.rootCause); - response = aso; - } - else if (response instanceof Message) - { - response = ((Message)response).getBody(); - } - responseBody.setData(response); - } - } - - private List legacyRequest(ActionContext context, List oldParams) - { - List newParams = new ArrayList(1); - Map headerMap = new HashMap(); - Object body = oldParams; - Message message = null; - MessageBody requestBody = context.getRequestMessageBody(); - - // Legacy Packet Security - List packetHeaders = context.getRequestMessage().getHeaders(); - packetCredentials(packetHeaders, headerMap); - - - // Legacy Body - if (oldParams.size() == 1) - { - Object obj = oldParams.get(0); - - if (obj != null && obj instanceof ASObject) - { - ASObject aso = (ASObject)obj; - - // Unwrap legacy Flex 1.5 Envelope type - if (isEnvelope(aso)) - { - body = aso.get("data"); - - // Envelope level headers - Object h = aso.get("headers"); - if (h != null && h instanceof List) - { - readEnvelopeHeaders((List)h, headerMap); - envelopeCredentials(headerMap); - } - } - } - } - - // Convert legacy body into a RemotingMessage - message = createMessage(requestBody, body, headerMap); - newParams.add(message); - return newParams; - } - - private boolean isEnvelope(ASObject aso) - { - String flag = null; - Object f = aso.get(LEGACY_ENVELOPE_FLAG_KEY); - if (f != null && f instanceof String) - flag = (String)f; - - if (flag != null && flag.equalsIgnoreCase(LEGACY_ENVELOPE_FLAG_VALUE)) - { - return true; - } - - return false; - } - - - private RemotingMessage createMessage(MessageBody messageBody, Object body, Map headerMap) - { - RemotingMessage remotingMessage = new RemotingMessage(); - // Assigning an empty String, MessageBroker expects non-null messageId. - remotingMessage.setMessageId(""); - remotingMessage.setBody(body); - remotingMessage.setHeaders(headerMap); - - // Decode legacy target URI into destination.operation - String targetURI = messageBody.getTargetURI(); - - int dotIndex = targetURI.lastIndexOf("."); - if (dotIndex > 0) - { - String destination = targetURI.substring(0, dotIndex); - remotingMessage.setDestination(destination); - } - - if (targetURI.length() > dotIndex) - { - String operation = targetURI.substring(dotIndex + 1); - remotingMessage.setOperation(operation); - } - - return remotingMessage; - } - - - private Map readEnvelopeHeaders(List headers, Map headerMap) - { - int count = headers.size(); - - for (int i = 0; i < count; i++) - { - Object obj = headers.get(i); - - //We currently expect a plain old AS Array - if (obj != null && obj instanceof List) - { - List h = (List)obj; - - Object name = null; - //Object mustUnderstand = null; - Object data = null; - - int numFields = h.size(); - - //The array must have exactly three (3) fields - if (numFields == 3) - { - name = h.get(0); - - if (name != null && name instanceof String) - { - //mustUnderstand = h.get(1); - data = h.get(2); - headerMap.put(name, data); - } - } - } - } - - return headerMap; - } - - private void envelopeCredentials(Map headers) - { - // Process Legacy Security Credentials - Object obj = headers.get(LEGACY_SECURITY_HEADER_NAME); - if (obj != null && obj instanceof ASObject) - { - ASObject header = (ASObject)obj; - String principal = (String)header.get(LEGACY_SECURITY_PRINCIPAL); - Object credentials = header.get(LEGACY_SECURITY_CREDENTIALS); - endpoint.getMessageBroker().getLoginManager().login(principal, credentials.toString()); - } - headers.remove(LEGACY_SECURITY_HEADER_NAME); - } - - private void packetCredentials(List packetHeaders, Map headers) - { - if (packetHeaders.size() > 0) - { - for (Iterator iter = packetHeaders.iterator(); iter.hasNext();) - { - MessageHeader header = (MessageHeader)iter.next(); - if (header.getName().equals(LEGACY_SECURITY_HEADER_NAME)) - { - Map loginInfo = (Map)header.getData(); - String principal = loginInfo.get(LEGACY_SECURITY_PRINCIPAL).toString(); - Object credentials = loginInfo.get(LEGACY_SECURITY_CREDENTIALS); - endpoint.getMessageBroker().getLoginManager().login(principal, credentials.toString()); - break; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java b/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java deleted file mode 100644 index d8288f3..0000000 --- a/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints.amf; - -import flex.messaging.FlexContext; -import flex.messaging.FlexSession; -import flex.messaging.MessageException; -import flex.messaging.endpoints.AbstractEndpoint; -import flex.messaging.io.MessageIOConstants; -import flex.messaging.io.amf.ActionContext; -import flex.messaging.io.amf.MessageBody; -import flex.messaging.messages.CommandMessage; -import flex.messaging.messages.ErrorMessage; -import flex.messaging.messages.Message; -import flex.messaging.messages.MessagePerformanceUtils; -import flex.messaging.services.MessageService; -import flex.messaging.util.StringUtils; -import flex.messaging.log.LogCategories; -import flex.messaging.log.Log; -import flex.messaging.util.ExceptionUtil; -import flex.messaging.util.UUIDUtils; - -import java.util.List; -import java.lang.reflect.Array; - -/** - * A simple bridge between the encoding/decoding functionality of - * the AMF endpoint and the MessageBroker: this last filter in the - * chain returns the message to the MessageBroker, which will then - * locate the correct service to handle the message. - */ -public class MessageBrokerFilter extends AMFFilter -{ - private static final int UNHANDLED_ERROR = 10000; - static final String LOG_CATEGORY = LogCategories.MESSAGE_GENERAL; - - protected AbstractEndpoint endpoint; - - public MessageBrokerFilter(AbstractEndpoint endpoint) - { - this.endpoint = endpoint; - } - - public void invoke(final ActionContext context) - { - MessageBody request = context.getRequestMessageBody(); - MessageBody response = context.getResponseMessageBody(); - - Message inMessage = request.getDataAsMessage(); - - Object outMessage = null; - - String replyMethodName = MessageIOConstants.STATUS_METHOD; - - try - { - // Lookup or create the correct FlexClient. - endpoint.setupFlexClient(inMessage); - - // Assign a clientId if necessary. - // We don't need to assign clientIds to general poll requests. - if (inMessage.getClientId() == null && - (!(inMessage instanceof CommandMessage) || ((CommandMessage)inMessage).getOperation() != CommandMessage.POLL_OPERATION)) - { - Object clientId = UUIDUtils.createUUID(); - inMessage.setClientId(clientId); - } - - // Messages received via the AMF channel can be batched (by NetConnection on the client) and - // we must not put the handler thread into a poll-wait state if a poll command message is followed by - // or preceeded by other messages in the batch; the request-response loop must complete without waiting. - // If the poll command is the only message in the batch it's ok to wait. - // If it isn't ok to wait, tag the poll message with a header that short-circuits any potential poll-wait. - if (inMessage instanceof CommandMessage) - { - CommandMessage command = (CommandMessage)inMessage; - if ((command.getOperation() == CommandMessage.POLL_OPERATION) && (context.getRequestMessage().getBodyCount() != 1)) - command.setHeader(CommandMessage.SUPPRESS_POLL_WAIT_HEADER, Boolean.TRUE); - } - - // If MPI is enabled update the MPI metrics on the object referred to by the context - // and the messages - if (context.isMPIenabled()) - MessagePerformanceUtils.setupMPII(context, inMessage); - - // Service the message. - outMessage = endpoint.serviceMessage(inMessage); - - // if processing of the message resulted in an error, set up context and reply method accordingly - if (outMessage instanceof ErrorMessage) - { - context.setStatus(MessageIOConstants.STATUS_ERR); - replyMethodName = MessageIOConstants.STATUS_METHOD; - } - else - { - replyMethodName = MessageIOConstants.RESULT_METHOD; - } - } - catch (MessageException e) - { - context.setStatus(MessageIOConstants.STATUS_ERR); - replyMethodName = MessageIOConstants.STATUS_METHOD; - - outMessage = e.createErrorMessage(); - ((ErrorMessage)outMessage).setCorrelationId(inMessage.getMessageId()); - ((ErrorMessage)outMessage).setDestination(inMessage.getDestination()); - ((ErrorMessage)outMessage).setClientId(inMessage.getClientId()); - - e.logAtHingePoint(inMessage, (ErrorMessage)outMessage, null /* Use default message intros */); - } - catch (Throwable t) - { - // Handle any uncaught failures. The normal exception path on the server - // is to throw MessageExceptions which are handled in the catch block above, - // so if that was skipped we have an overlooked or serious problem. - context.setStatus(MessageIOConstants.STATUS_ERR); - replyMethodName = MessageIOConstants.STATUS_METHOD; - - String lmeMessage = t.getMessage(); - if (lmeMessage == null) - lmeMessage = t.getClass().getName(); - - MessageException lme = new MessageException(); - lme.setMessage(UNHANDLED_ERROR, new Object[] {lmeMessage}); - - outMessage = lme.createErrorMessage(); - ((ErrorMessage)outMessage).setCorrelationId(inMessage.getMessageId()); - ((ErrorMessage)outMessage).setDestination(inMessage.getDestination()); - ((ErrorMessage)outMessage).setClientId(inMessage.getClientId()); - - if (Log.isError()) - { - Log.getLogger(LOG_CATEGORY).error("Unhandled error when processing a message: " + - t.toString() + StringUtils.NEWLINE + - " incomingMessage: " + inMessage + StringUtils.NEWLINE + - " errorReply: " + outMessage + StringUtils.NEWLINE + - ExceptionUtil.exceptionFollowedByRootCausesToString(t) + StringUtils.NEWLINE); - } - } - finally - { - // If MPI is enabled update the MPI metrics on the object referred to by the context - // and the messages - if (context.isRecordMessageSizes() || context.isRecordMessageTimes()) - { - MessagePerformanceUtils.updateOutgoingMPI(context, inMessage, outMessage); - } - - // If our channel-endpoint combination supports small messages, and - // if we know the current protocol version supports small messages, - // try to replace the message... - FlexSession session = FlexContext.getFlexSession(); - if (session != null && session.useSmallMessages() - && !context.isLegacy() - && context.getVersion() >= MessageIOConstants.AMF3 - && outMessage instanceof Message) - { - outMessage = endpoint.convertToSmallMessage((Message)outMessage); - } - - response.setReplyMethod(replyMethodName); - response.setData(outMessage); - } - } -}
