http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java b/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java deleted file mode 100644 index 0353354..0000000 --- a/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java +++ /dev/null @@ -1,606 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -import flex.messaging.FlexContext; -import flex.messaging.FlexSession; -import flex.messaging.client.FlexClient; -import flex.messaging.client.FlushResult; -import flex.messaging.client.PollFlushResult; -import flex.messaging.client.PollWaitListener; -import flex.messaging.client.UserAgentSettings; -import flex.messaging.config.ConfigMap; -import flex.messaging.config.ConfigurationConstants; -import flex.messaging.log.Log; -import flex.messaging.messages.CommandMessage; -import flex.messaging.util.UserAgentManager; - -import java.util.Enumeration; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Base class for HTTP-based endpoints that support regular polling and long polling, - * which means placing request threads that are polling for messages into a wait - * state until messages are available for delivery or the configurable wait interval - * is reached. - */ -public abstract class BasePollingHTTPEndpoint extends BaseHTTPEndpoint implements PollWaitListener -{ - //-------------------------------------------------------------------------- - // - // Private Static Constants - // - //-------------------------------------------------------------------------- - - private static final String POLLING_ENABLED = "polling-enabled"; - private static final String POLLING_INTERVAL_MILLIS = "polling-interval-millis"; - private static final String POLLING_INTERVAL_SECONDS = "polling-interval-seconds"; // Deprecated configuration option. - private static final String MAX_WAITING_POLL_REQUESTS = "max-waiting-poll-requests"; - private static final String WAIT_INTERVAL_MILLIS = "wait-interval-millis"; - private static final String CLIENT_WAIT_INTERVAL_MILLIS = "client-wait-interval-millis"; - // Force clients that exceed the long-poll limit to wait at least this long between poll requests. - // This matches the default polling interval defined in the client PollingChannel. - private static final int DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS = 3000; - - // User Agent based settings manager - private UserAgentManager userAgentManager = new UserAgentManager(); - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>BasePollingHTTPEndpoint</code>. - */ - public BasePollingHTTPEndpoint() - { - this(false); - } - - /** - * Constructs an <code>BasePollingHTTPEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>BasePollingHTTPEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public BasePollingHTTPEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Initializes the <code>Endpoint</code> with the properties. - * If subclasses override this method, they must call <code>super.initialize()</code>. - * - * @param id The ID of the <code>Endpoint</code>. - * @param properties Properties for the <code>Endpoint</code>. - */ - @Override - public void initialize(String id, ConfigMap properties) - { - super.initialize(id, properties); - - if (properties == null || properties.size() == 0) - { - // Initialize default user agent manager settings. - UserAgentManager.setupUserAgentManager(null, userAgentManager); - - return; // Nothing else to initialize. - } - - // General poll props. - pollingEnabled = properties.getPropertyAsBoolean(POLLING_ENABLED, false); - pollingIntervalMillis = properties.getPropertyAsLong(POLLING_INTERVAL_MILLIS, -1); - long pollingIntervalSeconds = properties.getPropertyAsLong(POLLING_INTERVAL_SECONDS, -1); // Deprecated - if (pollingIntervalSeconds > -1) - pollingIntervalMillis = pollingIntervalSeconds * 1000; - - // Piggybacking props. - piggybackingEnabled = properties.getPropertyAsBoolean(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, false); - - // HTTP poll wait props. - maxWaitingPollRequests = properties.getPropertyAsInt(MAX_WAITING_POLL_REQUESTS, 0); - waitInterval = properties.getPropertyAsLong(WAIT_INTERVAL_MILLIS, 0); - clientWaitInterval = properties.getPropertyAsInt(CLIENT_WAIT_INTERVAL_MILLIS, 0); - - // User Agent props. - UserAgentManager.setupUserAgentManager(properties, userAgentManager); - - // Set initial state for the canWait flag based on whether we allow waits or not. - if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0)) - { - waitEnabled = true; - canWait = true; - } - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * This flag is volatile to allow for consistent reads across thread without - * needing to pay the cost for a synchronized lock for each read. - */ - private volatile boolean canWait; - - /** - * Used to synchronize sets and gets to the number of waiting clients. - */ - protected final Object lock = new Object(); - - /** - * Set when properties are handled; used as a shortcut for logging to determine whether this - * instance attempts to put request threads in a wait state or not. - */ - private boolean waitEnabled; - - /** - * A count of the number of request threads that are currently in the wait state (including - * those on their way into or out of it). - */ - protected int waitingPollRequestsCount; - - /** - * A Map(notification Object for a waited request thread, Boolean.TRUE). - */ - private ConcurrentHashMap currentWaitedRequests; - - //-------------------------------------------------------------------------- - // - // Properties - // - //-------------------------------------------------------------------------- - - //---------------------------------- - // clientWaitInterval - //---------------------------------- - - protected int clientWaitInterval = 0; - - /** - * Retrieves the number of milliseconds the client will wait after receiving a response for - * a poll with server wait before it issues its next poll request. - * A value of zero or less causes the client to use its default polling interval (based on the - * channel's polling-interval-millis configuration) and this value is ignored. - * A value greater than zero will cause the client to wait for the specified interval before - * issuing its next poll request with a value of 1 triggering an immediate poll from the client - * as soon as a waited poll response is received. - * @return The client wait interval. - */ - public int getClientWaitInterval() - { - return clientWaitInterval; - } - - /** - * Sets the number of milliseconds a client will wait after receiving a response for a poll - * with server wait before it issues its next poll request. - * A value of zero or less causes the client to use its default polling interval (based on the - * channel's polling-interval-millis configuration) and this value is ignored. - * A value greater than zero will cause the client to wait for the specified interval before - * issuing its next poll request with a value of 1 triggering an immediate poll from the client - * as soon as a waited poll response is received. - * This property does not effect polling clients that poll the server without a server wait. - * - * @param value The number of milliseconds a client will wait before issuing its next poll when the - * server is configured to wait. - */ - public void setClientWaitInterval(int value) - { - clientWaitInterval = value; - } - - //---------------------------------- - // maxWaitingPollRequests - //---------------------------------- - - protected int maxWaitingPollRequests = 0; - - /** - * Retrieves the maximum number of server poll response threads that will be - * waiting for messages to arrive for clients. - * @return The maximum number of waiting poll requests. - */ - public int getMaxWaitingPollRequests() - { - return maxWaitingPollRequests; - } - - /** - * Sets the maximum number of server poll response threads that will be - * waiting for messages to arrive for clients. - * - * @param maxWaitingPollRequests The maximum number of server poll response threads - * that will be waiting for messages to arrive for the client. - */ - public void setMaxWaitingPollRequests(int maxWaitingPollRequests) - { - this.maxWaitingPollRequests = maxWaitingPollRequests; - if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0)) - { - waitEnabled = true; - canWait = (waitingPollRequestsCount < maxWaitingPollRequests); - } - } - - //---------------------------------- - // pollingEnabled - //---------------------------------- - - /** - * - * This is a property used on the client. - */ - protected boolean piggybackingEnabled; - - //---------------------------------- - // pollingEnabled - //---------------------------------- - - /** - * - * This is a property used on the client. - */ - protected boolean pollingEnabled; - - //---------------------------------- - // pollingIntervalMillis - //---------------------------------- - - /** - * - * This is a property used on the client. - */ - protected long pollingIntervalMillis = -1; - - //---------------------------------- - // waitInterval - //---------------------------------- - - protected long waitInterval = 0; - - /** - * Retrieves the number of milliseconds the server poll response thread will be - * waiting for messages to arrive for the client. - * @return The wait interval. - */ - public long getWaitInterval() - { - return waitInterval; - } - - /** - * Sets the number of milliseconds the server poll response thread will be - * waiting for messages to arrive for the client. - * - * @param waitInterval The number of milliseconds the server poll response thread will be - * waiting for messages to arrive for the client. - */ - public void setWaitInterval(long waitInterval) - { - this.waitInterval = waitInterval; - if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0)) - { - waitEnabled = true; - canWait = (waitingPollRequestsCount < maxWaitingPollRequests); - } - } - - //---------------------------------- - // waitingPollRequestsCount - //---------------------------------- - - /** - * Retrieves the count of the number of request threads that are currently in the wait state - * (including those on their way into or out of it). - * - * @return The count of the number of request threads that are currently in the wait state. - */ - public int getWaitingPollRequestsCount() - { - return waitingPollRequestsCount; - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * - * Returns a <code>ConfigMap</code> of endpoint properties that the client - * needs. This includes properties from <code>super.describeEndpoint</code> - * and additional <code>BaseHTTPEndpoint</code> specific properties under - * "properties" key. - */ - @Override - public ConfigMap describeEndpoint() - { - ConfigMap endpointConfig = super.describeEndpoint(); - - boolean createdProperties = false; - ConfigMap properties = endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null); - if (properties == null) - { - properties = new ConfigMap(); - createdProperties = true; - } - - if (pollingEnabled) - { - ConfigMap pollingEnabled = new ConfigMap(); - // Adding as a value rather than attribute to the parent - pollingEnabled.addProperty(EMPTY_STRING, TRUE_STRING); - properties.addProperty(POLLING_ENABLED, pollingEnabled); - } - - if (pollingIntervalMillis > -1) - { - ConfigMap pollingInterval = new ConfigMap(); - // Adding as a value rather than attribute to the parent - pollingInterval.addProperty(EMPTY_STRING, String.valueOf(pollingIntervalMillis)); - properties.addProperty(POLLING_INTERVAL_MILLIS, pollingInterval); - } - - if (piggybackingEnabled) - { - ConfigMap piggybackingEnabled = new ConfigMap(); - // Adding as a value rather than attribute to the parent - piggybackingEnabled.addProperty(EMPTY_STRING, String.valueOf(piggybackingEnabled)); - properties.addProperty(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, piggybackingEnabled); - } - - if (createdProperties && properties.size() > 0) - endpointConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties); - - return endpointConfig; - } - - /** - * Sets up monitoring of waited poll requests so they can be notified and exit when the - * endpoint stops. - * - * @see flex.messaging.endpoints.AbstractEndpoint#start() - */ - @Override - public void start() - { - if (isStarted()) - return; - - super.start(); - - currentWaitedRequests = new ConcurrentHashMap(); - } - - /** - * Ensures that no poll requests in a wait state are left un-notified when the endpoint stops. - * - * @see flex.messaging.endpoints.AbstractEndpoint#stop() - */ - @Override - public void stop() - { - if (!isStarted()) - return; - - // Notify any currently waiting polls. - Enumeration keys = currentWaitedRequests.keys(); - while (keys.hasMoreElements()) - { - Object notifier = keys.nextElement(); - synchronized (notifier) - { - notifier.notifyAll(); // Break any current waits. - } - } - currentWaitedRequests = null; - - super.stop(); - } - - /** - * (non-Javaodc) - * @see flex.messaging.client.PollWaitListener#waitStart(Object) - */ - public void waitStart(Object notifier) - { - currentWaitedRequests.put(notifier, Boolean.TRUE); - } - - /** - * (non-Javaodc) - * @see flex.messaging.client.PollWaitListener#waitEnd(Object) - */ - public void waitEnd(Object notifier) - { - if (currentWaitedRequests != null) - currentWaitedRequests.remove(notifier); - } - - //-------------------------------------------------------------------------- - // - // Protected Methods - // - //-------------------------------------------------------------------------- - - /** - * Overrides the base poll handling to support optionally putting Http request handling threads - * into a wait state until messages are available to be delivered in the poll response or a timeout is reached. - * The number of threads that may be put in a wait state is bounded by <code>max-waiting-poll-requests</code> - * and waits will only be attempted if the canWait flag that is based on the <code>max-waiting-poll-requests</code> - * and the specified <code>wait-interval</code> is true. - * - * @param flexClient The FlexClient that issued the poll request. - * @param pollCommand The poll command from the client. - * @return The flush info used to build the poll response. - */ - @Override - protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand) - { - FlushResult flushResult = null; - if (canWait && !pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER)) - { - FlexSession session = FlexContext.getFlexSession(); - // If canWait is true it means we currently have less than the max number of allowed waiting threads. - - // We need to protect writes/reads to the wait count with the endpoint's lock. - // Also, we have to be careful to handle the case where two threads get to this point when only - // one wait spot remains; one thread will win and the other needs to revert to a non-waitable poll. - boolean thisThreadCanWait; - synchronized (lock) - { - ++waitingPollRequestsCount; - if (waitingPollRequestsCount == maxWaitingPollRequests) - { - thisThreadCanWait = true; // This thread got the last wait spot. - canWait = false; - } - else if (waitingPollRequestsCount > maxWaitingPollRequests) - { - thisThreadCanWait = false; // This thread was beaten out for the last spot. - --waitingPollRequestsCount; // Decrement the count because we're not going to try a poll with wait. - canWait = false; // All the wait spots are currently occupied so prevent further attempts for now. - } - else - { - // We haven't hit the limit yet, allow this thread to wait. - thisThreadCanWait = true; - } - } - - // Check the max waiting connections per session count - if (thisThreadCanWait) - { - String userAgentValue = FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME); - UserAgentSettings agentSettings = userAgentManager.match(userAgentValue); - synchronized(session) - { - if (agentSettings != null) - session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession(); - - ++session.streamingConnectionsCount; - if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED - || session.streamingConnectionsCount <= session.maxConnectionsPerSession) - { - thisThreadCanWait = true; // We haven't hit the limit yet, allow the wait. - } - else // (session.streamingConnectionsCount > session.maxConnectionsPerSession) - { - thisThreadCanWait = false; // no more from this client - --session.streamingConnectionsCount; - } - } - - if (!thisThreadCanWait) - { - // Decrement the waiting poll count, since this poll isn't going to wait. - synchronized (lock) - { - --waitingPollRequestsCount; - if (waitingPollRequestsCount < maxWaitingPollRequests) - canWait = true; - } - if (Log.isDebug()) - { - log.debug("Max long-polling requests per session limit (" + session.maxConnectionsPerSession + ") has been reached, this poll won't wait."); - } - } - - } - - if (thisThreadCanWait) - { - if (Log.isDebug()) - log.debug("Number of waiting threads for endpoint with id '"+ getId() +"' is " + waitingPollRequestsCount + "."); - - try - { - flushResult = flexClient.pollWithWait(getId(), FlexContext.getFlexSession(), this, waitInterval); - if (flushResult != null) - { - // Prevent busy-polling due to multiple clients sharing a session and swapping each other out too quickly. - if ((flushResult instanceof PollFlushResult) && ((PollFlushResult)flushResult).isAvoidBusyPolling() && (flushResult.getNextFlushWaitTimeMillis() < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS)) - { - // Force the client polling interval to match the default defined in the client PollingChannel. - flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS); - } - else if ((clientWaitInterval > 0) && (flushResult.getNextFlushWaitTimeMillis() == 0)) - { - // If the FlushResult doesn't specify it's own flush wait time, use the configured clientWaitInterval if defined. - flushResult.setNextFlushWaitTimeMillis(clientWaitInterval); - } - } - } - finally - { - // We're done waiting so decrement the count of waiting threads and update the canWait flag if necessary - synchronized (lock) - { - --waitingPollRequestsCount; - if (waitingPollRequestsCount < maxWaitingPollRequests) - canWait = true; - } - synchronized (session) - { - --session.streamingConnectionsCount; - } - - if (Log.isDebug()) - log.debug("Number of waiting threads for endpoint with id '"+ getId() +"' is " + waitingPollRequestsCount + "."); - } - } - } - else if (Log.isDebug() && waitEnabled) - { - if (pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER)) - log.debug("Suppressing poll wait for this request because it is part of a batch of messages to process."); - else - log.debug("Max waiting poll requests limit '" + maxWaitingPollRequests + "' has been reached for endpoint '" + getId() + "'. FlexClient with id '"+ flexClient.getId() + "' will poll with no wait."); - } - - // If we weren't able to do a poll with wait above for any reason just run the base poll handling logic. - if (flushResult == null) - { - flushResult = super.handleFlexClientPoll(flexClient, pollCommand); - // If this is an excess poll request that we couldn't wait on, make sure the client doesn't poll the endpoint too aggressively. - // In this case, force a client wait to match the default polling interval defined in the client PollingChannel. - if ( waitEnabled && (pollingIntervalMillis < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS)) - { - if (flushResult == null) - { - flushResult = new FlushResult(); - } - flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS); - } - } - - return flushResult; - } -}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java b/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java deleted file mode 100644 index 7aa7328..0000000 --- a/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java +++ /dev/null @@ -1,1226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -import flex.messaging.FlexContext; -import flex.messaging.FlexSession; -import flex.messaging.HttpFlexSession; -import flex.messaging.MessageException; -import flex.messaging.client.EndpointPushNotifier; -import flex.messaging.client.FlexClient; -import flex.messaging.client.FlushResult; -import flex.messaging.client.UserAgentSettings; -import flex.messaging.config.ConfigMap; -import flex.messaging.log.Log; -import flex.messaging.messages.AcknowledgeMessage; -import flex.messaging.messages.AsyncMessage; -import flex.messaging.messages.CommandMessage; -import flex.messaging.messages.Message; -import flex.messaging.messages.MessagePerformanceInfo; -import flex.messaging.messages.MessagePerformanceUtils; -import flex.messaging.util.TimeoutManager; -import flex.messaging.util.UserAgentManager; - -import javax.servlet.ServletOutputStream; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadFactory; - -/** - * Base class for HTTP-based endpoints that support streaming HTTP connections to - * connected clients. - * Each streaming connection managed by this endpoint consumes one of the request - * handler threads provided by the servlet container, so it is not highly scalable - * but offers performance advantages over client polling for clients receiving - * a steady, rapid stream of pushed messages. - * This endpoint does not support polling clients and will fault any poll requests - * that are received. To support polling clients use subclasses of - * BaseHTTPEndpoint instead. - */ -public abstract class BaseStreamingHTTPEndpoint extends BaseHTTPEndpoint -{ - //-------------------------------------------------------------------------- - // - // Private Static Constants - // - //-------------------------------------------------------------------------- - - /** - * This token is used in chunked HTTP responses frequently so initialize it statically for general use. - */ - private static final byte[] CRLF_BYTES = {(byte)13, (byte)10}; - - /** - * This token is used for the terminal chunk within a chunked response. - */ - private static final byte ZERO_BYTE = (byte)48; - - /** - * This token is used to signal that a chunk of data should be skipped by the client. - */ - private static final byte NULL_BYTE = (byte)0; - - /** - * Parameter name for 'command' passed in a request for a new streaming connection. - */ - private static final String COMMAND_PARAM_NAME = "command"; - - /** - * This is the token at the end of the HTTP request line that indicates that it is - * a stream connection that should be held open to push data back to the client, - * as opposed to a regular request-response message. - */ - private static final String OPEN_COMMAND = "open"; - - /** - * This is the token at the end of the HTTP request line that indicates that it is - * a stream connection that should be closed. - */ - private static final String CLOSE_COMMAND = "close"; - - /** - * Parameter name for the stream ID; passed with commands for an existing streaming connection. - */ - private static final String STREAM_ID_PARAM_NAME = "streamId"; - - /** - * Constant for HTTP/1.0. - */ - private static final String HTTP_1_0 = "HTTP/1.0"; - - /** - * Thread name suffix for request threads that are servicing a pinned open streaming connection. - */ - private static final String STREAMING_THREAD_NAME_EXTENSION = "-in-streaming-mode"; - - /** - * Configuration constants. - */ - private static final String PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES = "connection-idle-timeout-minutes"; - private static final String PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES = "idle-timeout-minutes"; - private static final String MAX_STREAMING_CLIENTS = "max-streaming-clients"; - private static final String SERVER_TO_CLIENT_HEARTBEAT_MILLIS = "server-to-client-heartbeat-millis"; - private static final String PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = "invalidate-messageclient-on-streaming-close"; - - /** - * Defaults. - */ - private static final boolean DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = false; - private static final int DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS = 5000; - private static final int DEFAULT_MAX_STREAMING_CLIENTS = 10; - - /** - * Errors. - */ - public static final String POLL_NOT_SUPPORTED_CODE = "Server.PollNotSupported"; - public static final int POLL_NOT_SUPPORTED_MESSAGE = 10034; - - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>BaseStreamingHTTPEndpoint</code>. - */ - public BaseStreamingHTTPEndpoint() - { - this(false); - } - - /** - * Constructs an <code>BaseStreamingHTTPEndpoint</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>BaseStreamingHTTPEndpoint</code> - * is manageable; <code>false</code> otherwise. - */ - public BaseStreamingHTTPEndpoint(boolean enableManagement) - { - super(enableManagement); - } - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Initializes the <code>Endpoint</code> with the properties. - * If subclasses override this method, they must call <code>super.initialize()</code>. - * - * @param id The ID of the <code>Endpoint</code>. - * @param properties Properties for the <code>Endpoint</code>. - */ - @Override - public void initialize(String id, ConfigMap properties) - { - super.initialize(id, properties); - - if (properties == null || properties.size() == 0) - { - // Initialize default user agent manager settings. - UserAgentManager.setupUserAgentManager(null, userAgentManager); - - return; // Nothing else to initialize. - } - - // The interval that the server will check if the client is still available. - serverToClientHeartbeatMillis = properties.getPropertyAsLong(SERVER_TO_CLIENT_HEARTBEAT_MILLIS, DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS); - setServerToClientHeartbeatMillis(serverToClientHeartbeatMillis); - - setInvalidateMessageClientOnStreamingClose(properties.getPropertyAsBoolean(PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE, DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE)); - - // Number of minutes a client can remain idle before the server times the connection out. - int connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES, getConnectionIdleTimeoutMinutes()); - if (connectionIdleTimeoutMinutes != 0) - { - setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes); - } - else - { - connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES, getConnectionIdleTimeoutMinutes()); - if (connectionIdleTimeoutMinutes != 0) - setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes); - } - - // User-agent configuration for kick-start bytes and max streaming connections per session. - UserAgentManager.setupUserAgentManager(properties, userAgentManager); - - // Maximum number of clients allowed to have streaming HTTP connections with the endpoint. - maxStreamingClients = properties.getPropertyAsInt(MAX_STREAMING_CLIENTS, DEFAULT_MAX_STREAMING_CLIENTS); - - // Set initial state for the canWait flag based on whether we allow waits or not. - canStream = (maxStreamingClients > 0); - } - - - @Override - public void start() - { - if (isStarted()) - return; - - super.start(); - - if (connectionIdleTimeoutMinutes > 0) - { - pushNotifierTimeoutManager = new TimeoutManager(new ThreadFactory() - { - int counter = 1; - public synchronized Thread newThread(Runnable runnable) - { - Thread t = new Thread(runnable); - t.setName(getId() + "-StreamingConnectionTimeoutThread-" + counter++); - return t; - } - }); - } - - currentStreamingRequests = new ConcurrentHashMap<String, EndpointPushNotifier>(); - } - - /** - * (non-JavaDoc) - * @see flex.messaging.endpoints.AbstractEndpoint#stop() - */ - @Override - public void stop() - { - if (!isStarted()) - return; - - // Shutdown the timeout manager for streaming connections cleanly. - if (pushNotifierTimeoutManager != null) - { - pushNotifierTimeoutManager.shutdown(); - pushNotifierTimeoutManager = null; - } - - // Shutdown any currently open streaming connections. - for (EndpointPushNotifier notifier : currentStreamingRequests.values()) - notifier.close(); - - currentStreamingRequests = null; - - super.stop(); - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * Used to synchronize sets and gets to the number of streaming clients. - */ - protected final Object lock = new Object(); - - /** - * Used to keep track of the mapping between user agent match strings and - * the bytes needed to kickstart their streaming connections. - */ - protected UserAgentManager userAgentManager = new UserAgentManager(); - - /** - * This flag is volatile to allow for consistent reads across thread without - * needing to pay the cost for a synchronized lock for each read. - */ - private volatile boolean canStream = true; - - /** - * Manages timing out EndpointPushNotifier instances. - */ - private volatile TimeoutManager pushNotifierTimeoutManager; - - /** - * A Map(EndpointPushNotifier, Boolean.TRUE) containing all currently open streaming notifiers - * for this endpoint. - * Used for clean shutdown. - */ - private ConcurrentHashMap<String, EndpointPushNotifier> currentStreamingRequests; - - //-------------------------------------------------------------------------- - // - // Properties - // - //-------------------------------------------------------------------------- - - //------------------------------------------ - // invalidateMessageClientOnStreamingClose - //----------------------------------------- - - private volatile boolean invalidateMessageClientOnStreamingClose = DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE; - - /** - * Returns whether invalidate-messageclient-on-streaming-close is enabled. - * See {@link BaseStreamingHTTPEndpoint#setInvalidateMessageClientOnStreamingClose(boolean)} - * for details. - * - * @return <code>true</code> if the invalidate-messageclient-on-streaming-close is enabled, <code>false</code> otherwise. - */ - public boolean isInvalidateMessageClientOnStreamingClose() - { - return invalidateMessageClientOnStreamingClose; - } - - /** - * Sets the invalidate-messageclient-on-streaming close property. If enabled, - * when the streaming connection is closed for whatever reason (for example, the client is gone), - * the client's associated MessageClient on the server is invalidated immediately. - * This is useful in scenarios where there is a constant stream of messages, the client is gone, - * and the streaming connection is closed, but the session has not timed out on the server yet. - * In that case, enabling this property will prevent messages accumulating on the session on behalf - * of the MessageClient that will invalidate. - * <p> - * Important: Do not enable this property when reliable messaging is used, otherwise - * reliable reconnect attempts will not happen correctly.</p> - * - * @param value The property value. - */ - public void setInvalidateMessageClientOnStreamingClose(boolean value) - { - invalidateMessageClientOnStreamingClose = value; - } - - //---------------------------------- - // serverToClientHeartbeatMillis - //---------------------------------- - - private long serverToClientHeartbeatMillis = DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS; - - /** - * Retrieves the number of milliseconds the server will wait before writing a - * single <code>null</code> byte to the streaming connection, to ensure the client is - * still available. - * @return The server-to-client heartbeat time in milliseconds. - */ - public long getServerToClientHeartbeatMillis() - { - return serverToClientHeartbeatMillis; - } - - /** - * Retrieves the number of milliseconds the server will wait before writing a - * single <code>null</code> byte to the streaming connection to ensure the client is - * still available when there are no new messages for the client. - * A non-positive value means the server will wait forever for new messages and - * it will not write the <code>null</code> byte to determine if the client is available. - * @param serverToClientHeartbeatMillis The server-to-client heartbeat time in milliseconds. - */ - public void setServerToClientHeartbeatMillis(long serverToClientHeartbeatMillis) - { - if (serverToClientHeartbeatMillis < 0) - serverToClientHeartbeatMillis = 0; - this.serverToClientHeartbeatMillis = serverToClientHeartbeatMillis; - } - - //---------------------------------- - // connectionIdleTimeoutMinutes - //---------------------------------- - - private int connectionIdleTimeoutMinutes = 0; - - /** - * Retrieves the number of minutes a client can remain idle before the server - * times the connection out. The default value is 0, indicating that connections - * will not be timed out and must be closed by the client or server, either explicitly - * or by either process terminating. - * - * @return The number of minutes a client can remain idle before the server - * times the connection out. - */ - public int getConnectionIdleTimeoutMinutes() - { - return connectionIdleTimeoutMinutes; - } - - /** - * Sets the number of minutes a client can remain idle before the server - * times the connection out. A value of 0 or below indicates that - * connections will not be timed out. - * - * @param value The number of minutes a client can remain idle - * before the server times the connection out. - */ - public void setConnectionIdleTimeoutMinutes(int value) - { - if (value < 0) - value = 0; - - this.connectionIdleTimeoutMinutes = value; - } - - /** - * (non-JavaDoc) - * @deprecated Use {@link BaseStreamingHTTPEndpoint#getConnectionIdleTimeoutMinutes()} instead. - */ - public int getIdleTimeoutMinutes() - { - return getConnectionIdleTimeoutMinutes(); - } - - /** - * (non-JavaDoc) - * @deprecated Use {@link BaseStreamingHTTPEndpoint#setConnectionIdleTimeoutMinutes(int)} instead. - */ - public void setIdleTimeoutMinutes(int value) - { - setConnectionIdleTimeoutMinutes(value); - } - - //---------------------------------- - // maxStreamingClients - //---------------------------------- - - private int maxStreamingClients = DEFAULT_MAX_STREAMING_CLIENTS; - - /** - * Retrieves the maximum number of clients that will be allowed to establish - * a streaming HTTP connection with the endpoint. - * - * @return The maximum number of clients that will be allowed to establish - * a streaming HTTP connection with the endpoint. - */ - public int getMaxStreamingClients() - { - return maxStreamingClients; - } - - /** - * Sets the maximum number of clients that will be allowed to establish - * a streaming HTTP connection with the server. - * - * @param maxStreamingClients The maximum number of clients that will be allowed - * to establish a streaming HTTP connection with the server. - */ - public void setMaxStreamingClients(int maxStreamingClients) - { - this.maxStreamingClients = maxStreamingClients; - canStream = (streamingClientsCount < maxStreamingClients); - } - - //---------------------------------- - // streamingClientsCount - //---------------------------------- - - protected int streamingClientsCount; - - /** - * Retrieves the the number of clients that are currently in the streaming state. - * - * @return The number of clients that are currently in the streaming state. - */ - public int getStreamingClientsCount() - { - return streamingClientsCount; - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Handles HTTP requests targetting this endpoint. - * Two types or requests are supported. If the request is a regular request-response AMF/AMFX - * message it is handled by the base logic in BaseHTTPEndpoint.service. However, if it is a - * request to open a streaming HTTP connection to the client this endpoint performs some - * validation checks and then holds the connection open to stream data back to the client - * over. - * - * @param req The original servlet request. - * @param res The active servlet response. - */ - @Override - public void service(HttpServletRequest req, HttpServletResponse res) - { - String command = req.getParameter(COMMAND_PARAM_NAME); - if (command != null) - serviceStreamingRequest(req, res); - else // Let BaseHTTPEndpoint logic handle regular request-response messaging. - super.service(req, res); - } - - //-------------------------------------------------------------------------- - // - // Protected Methods - // - //-------------------------------------------------------------------------- - - /** - * If the message has MPI enabled, this method adds all the needed performance - * headers to the message. - * - * @param message Message to add performance headers to. - */ - protected void addPerformanceInfo(Message message) - { - MessagePerformanceInfo mpiOriginal = getMPI(message); - if (mpiOriginal == null) - return; - - MessagePerformanceInfo mpip = (MessagePerformanceInfo)mpiOriginal.clone(); - try - { - // Set the original message info as the pushed causer info. - MessagePerformanceUtils.setMPIP(message, mpip); - MessagePerformanceUtils.setMPII(message, null); - } - catch (Exception e) - { - if (Log.isDebug()) - log.debug("MPI exception while streaming the message: " + e.toString()); - } - - // Overhead only used when MPI is enabled for sizing - MessagePerformanceInfo mpio = new MessagePerformanceInfo(); - if (mpip.recordMessageTimes) - { - mpio.sendTime = System.currentTimeMillis(); - mpio.infoType = "OUT"; - } - mpio.pushedFlag = true; - MessagePerformanceUtils.setMPIO(message, mpio); - - // If MPI sizing information is enabled serialize again so that we know size - if (mpip.recordMessageSizes) - { - try - { - // Each subclass serializes the message in their own format to - // get the message size for the MPIO. - long serializationOverhead = System.currentTimeMillis(); - mpio.messageSize = getMessageSizeForPerformanceInfo(message); - - // Set serialization overhead to the time calculated during serialization above - if (mpip.recordMessageTimes) - { - serializationOverhead = System.currentTimeMillis() - serializationOverhead; - mpip.addToOverhead(serializationOverhead); - mpiOriginal.addToOverhead(serializationOverhead); - mpio.sendTime = System.currentTimeMillis(); - } - } - catch(Exception e) - { - log.debug("MPI exception while streaming the message: " + e.toString()); - } - } - } - - /** - * Utility method to convert streamed push messages to their small versions - * if the channel-endpoint combination supports small messages. - * - * @param message The regular message. - * @return The small message if the message has a small version, or regular message - * if it doesn't . - */ - protected Message convertPushMessageToSmall(Message message) - { - FlexSession session = FlexContext.getFlexSession(); - if (session != null && session.useSmallMessages()) - return convertToSmallMessage(message); - return message; - } - - /** - * Used internally for performance information gathering; not intended for - * public use. The default implementation of this method returns zero. - * Subclasses should overwrite if they want to accurately report message - * size information in performance information gathering. - * - * @param message Message to get the size for. - * - * @return The size of the message after message is serialized. - */ - protected long getMessageSizeForPerformanceInfo(Message message) - { - return 0; - } - - /** - * This streaming endpoint does not support polling clients. - * - * @param flexClient The FlexClient that issued the poll request. - * @param pollCommand The poll command from the client. - * @return The flush info used to build the poll response. - */ - @Override - protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand) - { - MessageException me = new MessageException(); - me.setMessage(POLL_NOT_SUPPORTED_MESSAGE); - me.setDetails(POLL_NOT_SUPPORTED_MESSAGE); - me.setCode(POLL_NOT_SUPPORTED_CODE); - throw me; - } - - /** - * Handles streaming connection open command sent by the FlexClient. - * - * @param req The <code>HttpServletRequest</code> to service. - * @param res The <code>HttpServletResponse</code> to be used in case an error - * has to be sent back. - * @param flexClient FlexClient that requested the streaming connection. - */ - protected void handleFlexClientStreamingOpenRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient) - { - FlexSession session = FlexContext.getFlexSession(); - if (canStream && session.canStream) - { - // If canStream/session.canStream is true it means we currently have - // less than the max number of allowed streaming threads, per endpoint/session. - - // We need to protect writes/reads to the stream count with the endpoint's lock. - // Also, we have to be careful to handle the case where two threads get to this point when only - // one streaming spot remains; one thread will win and the other needs to fault. - boolean thisThreadCanStream; - synchronized (lock) - { - ++streamingClientsCount; - if (streamingClientsCount == maxStreamingClients) - { - thisThreadCanStream = true; // This thread got the last spot. - canStream = false; - } - else if (streamingClientsCount > maxStreamingClients) - { - thisThreadCanStream = false; // This thread was beaten out for the last spot. - --streamingClientsCount; // Decrement the count because we're not going to grant the streaming right to the client. - } - else - { - // We haven't hit the limit yet, allow this thread to stream. - thisThreadCanStream = true; - } - } - - // If the thread cannot wait due to endpoint streaming connection - // limit, inform the client and return. - if (!thisThreadCanStream) - { - String errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '" - + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '" - + maxStreamingClients + "' has been reached."; - if (Log.isError()) - log.error(errorMessage); - try - { - errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '" - + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached."; - res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage); - } - catch (IOException ignore) - {} - return; - } - - // Setup for specific user agents. - byte[] kickStartBytesToStream = null; - String userAgentValue = req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME); - UserAgentSettings agentSettings = userAgentManager.match(userAgentValue); - if (agentSettings != null) - { - synchronized (session) - { - session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession(); - } - - int kickStartBytes = agentSettings.getKickstartBytes(); - if (kickStartBytes > 0) - { - // Determine the minimum number of actual bytes that need to be sent to - // kickstart, taking into account transfer-encoding overhead. - try - { - int chunkLengthHeaderSize = Integer.toHexString(kickStartBytes).getBytes("ASCII").length; - int chunkOverhead = chunkLengthHeaderSize + 4; // 4 for the 2 wrapping CRLF tokens. - int minimumKickstartBytes = kickStartBytes - chunkOverhead; - kickStartBytesToStream = new byte[(minimumKickstartBytes > 0) ? minimumKickstartBytes : - kickStartBytes]; - } - catch (UnsupportedEncodingException ignore) - { - kickStartBytesToStream = new byte[kickStartBytes]; - } - Arrays.fill(kickStartBytesToStream, NULL_BYTE); - } - } - - // Now, check with the session before granting the streaming connection. - synchronized(session) - { - ++session.streamingConnectionsCount; - if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED) - { - thisThreadCanStream = true; - } - else if (session.streamingConnectionsCount == session.maxConnectionsPerSession) - { - thisThreadCanStream = true; // This thread got the last spot in the session. - session.canStream = false; - } - else if (session.streamingConnectionsCount > session.maxConnectionsPerSession) - { - thisThreadCanStream = false; // This thread was beaten out for the last spot. - --session.streamingConnectionsCount; - synchronized(lock) - { - // Decrement the endpoint count because we're not going to grant the streaming right to the client. - --streamingClientsCount; - } - } - else - { - // We haven't hit the limit yet, allow this thread to stream. - thisThreadCanStream = true; - } - } - - // If the thread cannot wait due to session streaming connection - // limit, inform the client and return. - if (!thisThreadCanStream) - { - if (Log.isError()) - log.error("Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '" - + flexClient.getId() + "' because " + UserAgentManager.MAX_PERSISTENT_CONNECTIONS_PER_SESSION + " limit of '" + session.maxConnectionsPerSession - + ((agentSettings != null) ? "' for user-agent '" + agentSettings.getMatchOn() + "'" : "") + " has been reached." ); - try - { - // Return an HTTP status code 400. - String errorMessage = "The server cannot grant streaming connection to this client because limit has been reached."; - res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage); - } - catch (IOException ignore) - { - // NOWARN - } - return; - } - - Thread currentThread = Thread.currentThread(); - String threadName = currentThread.getName(); - EndpointPushNotifier notifier = null; - boolean suppressIOExceptionLogging = false; // Used to suppress logging for IO exception. - try - { - currentThread.setName(threadName + STREAMING_THREAD_NAME_EXTENSION); - - // Open and commit response headers and get output stream. - if (addNoCacheHeaders) - addNoCacheHeaders(req, res); - res.setContentType(getResponseContentType()); - res.setHeader("Transfer-Encoding", "chunked"); - res.setHeader("Connection", "close"); - ServletOutputStream os = res.getOutputStream(); - res.flushBuffer(); - - // If kickstart-bytes are specified, stream them. - if (kickStartBytesToStream != null) - { - if (Log.isDebug()) - log.debug("Endpoint with id '" + getId() + "' is streaming " + kickStartBytesToStream.length - + " bytes (not counting chunk encoding overhead) to kick-start the streaming connection for FlexClient with id '" - + flexClient.getId() + "'."); - - streamChunk(kickStartBytesToStream, os, res); - } - - // Setup serialization and type marshalling contexts - setThreadLocals(); - - // Activate streaming helper for this connection. - // Watch out for duplicate stream issues. - try - { - notifier = new EndpointPushNotifier(this, flexClient); - } - catch (MessageException me) - { - if (me.getNumber() == 10033) // It's a duplicate stream request from the same FlexClient. Leave the current stream in place and fault this. - { - if (Log.isWarn()) - log.warn("Endpoint with id '" + getId() + "' received a duplicate streaming connection request from, FlexClient with id '" - + flexClient.getId() + "'. Faulting request."); - - // Rollback counters and send an error response. - synchronized (lock) - { - --streamingClientsCount; - canStream = (streamingClientsCount < maxStreamingClients); - synchronized (session) - { - --session.streamingConnectionsCount; - session.canStream = (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED - || session.streamingConnectionsCount < session.maxConnectionsPerSession); - } - } - try - { - res.sendError(HttpServletResponse.SC_BAD_REQUEST); - } - catch (IOException ignore) - { - // NOWARN - } - return; // Exit early. - } - } - if (connectionIdleTimeoutMinutes > 0) - notifier.setIdleTimeoutMinutes(connectionIdleTimeoutMinutes); - notifier.setLogCategory(getLogCategory()); - monitorTimeout(notifier); - currentStreamingRequests.put(notifier.getNotifierId(), notifier); - - // Push down an acknowledgement for the 'connect' request containing the unique id for this specific stream. - AcknowledgeMessage connectAck = new AcknowledgeMessage(); - connectAck.setBody(notifier.getNotifierId()); - connectAck.setCorrelationId(BaseStreamingHTTPEndpoint.OPEN_COMMAND); - ArrayList toPush = new ArrayList(1); - toPush.add(connectAck); - streamMessages(toPush, os, res); - - // Output session level streaming count. - if (Log.isDebug()) - Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming clients for FlexSession with id '"+ session.getId() +"' is " + session.streamingConnectionsCount + "."); - - // Output endpoint level streaming count. - if (Log.isDebug()) - log.debug("Number of streaming clients for endpoint with id '"+ getId() +"' is " + streamingClientsCount + "."); - - // And cycle in a wait-notify loop with the aid of the helper until it - // is closed, we're interrupted or the act of streaming data to the client fails. - while (!notifier.isClosed()) - { - try - { - // Drain any messages that might have been accumulated - // while the previous drain was being processed. - List<AsyncMessage> messages = null; - synchronized (notifier.pushNeeded) - { - messages = notifier.drainMessages(); - } - streamMessages(messages, os, res); - - synchronized (notifier.pushNeeded) - { - notifier.pushNeeded.wait(serverToClientHeartbeatMillis); - - messages = notifier.drainMessages(); - } - // If there are no messages to send to the client, send an null - // byte as a heartbeat to make sure the client is still valid. - if (messages == null && serverToClientHeartbeatMillis > 0) - { - try - { - os.write(NULL_BYTE); - res.flushBuffer(); - } - catch (IOException e) - { - if (Log.isWarn()) - log.warn("Endpoint with id '" + getId() + "' is closing the streaming connection to FlexClient with id '" - + flexClient.getId() + "' because endpoint encountered a socket write error" + - ", possibly due to an unresponsive FlexClient.", e); - break; // Exit the wait loop. - } - } - // Otherwise stream the messages to the client. - else - { - // Update the last time notifier was used to drain messages. - // Important for idle timeout detection. - notifier.updateLastUse(); - - streamMessages(messages, os, res); - } - } - catch (InterruptedException e) - { - if (Log.isWarn()) - log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' has been interrupted and the streaming connection will be closed."); - os.close(); - break; // Exit the wait loop. - } - - // Update the FlexClient last use time to prevent FlexClient from - // timing out when the client is still subscribed. It is important - // to do this outside synchronized(notifier.pushNeeded) to avoid - // thread deadlock! - flexClient.updateLastUse(); - } - if (Log.isDebug()) - log.debug("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' is releasing connection and returning to the request handler pool."); - suppressIOExceptionLogging = true; - // Terminate the response. - streamChunk(null, os, res); - } - catch (IOException e) - { - if (Log.isWarn() && !suppressIOExceptionLogging) - log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' is closing connection due to an IO error.", e); - } - finally - { - currentThread.setName(threadName); - - // We're done so decrement the counts for streaming threads, - // and update the canStream flag if necessary. - synchronized (lock) - { - --streamingClientsCount; - canStream = (streamingClientsCount < maxStreamingClients); - synchronized (session) - { - --session.streamingConnectionsCount; - session.canStream = (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED - || session.streamingConnectionsCount < session.maxConnectionsPerSession); - } - } - - if (notifier != null && currentStreamingRequests != null) - { - currentStreamingRequests.remove(notifier.getNotifierId()); - notifier.close(); - } - - // Output session level streaming count. - if (Log.isDebug()) - Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming clients for FlexSession with id '"+ session.getId() +"' is " + session.streamingConnectionsCount + "."); - - // Output endpoint level streaming count. - if (Log.isDebug()) - log.debug("Number of streaming clients for endpoint with id '"+ getId() +"' is " + streamingClientsCount + "."); - } - } - // Otherwise, client's streaming connection open request could not be granted. - else - { - if (Log.isError()) - { - String logString = null; - if (!canStream) - { - logString = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '" - + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '" - + maxStreamingClients + "' has been reached."; - } - else if (!session.canStream) - { - logString = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '" - + flexClient.getId() + "' because " + UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit of '" - + session.maxConnectionsPerSession + "' has been reached."; - } - if (logString != null) - log.error(logString); - } - - try - { - // Return an HTTP status code 400 to indicate that client request can't be processed. - String errorMessage = null; - if (!canStream) - { - errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '" - + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached."; - } - else if (!session.canStream) - { - errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '" - + flexClient.getId() + "' because " + UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit has been reached."; - } - res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage); - } - catch (IOException ignore) - {} - } - } - - /** - * Handles streaming connection close command sent by the FlexClient. - * - * @param req The <code>HttpServletRequest</code> to service. - * @param res The <code>HttpServletResponse</code> to be used in case an error - * has to be sent back. - * @param flexClient FlexClient that requested the streaming connection. - * @param streamId The id for the stream to close. - */ - protected void handleFlexClientStreamingCloseRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient, String streamId) - { - if (streamId != null) - { - EndpointPushNotifier notifier = (EndpointPushNotifier)flexClient.getEndpointPushHandler(getId()); - if ((notifier != null) && notifier.getNotifierId().equals(streamId)) - notifier.close(); - } - } - - /** - * Service streaming connection commands. - * - * @param req The <code>HttpServletRequest</code> to service. - * @param res The <code>HttpServletResponse</code> to be used in case an error - * has to be sent back. - */ - protected void serviceStreamingRequest(HttpServletRequest req, HttpServletResponse res) - { - // If this is a request for a streaming connection, make sure it's for a valid FlexClient - // and that the FlexSession doesn't already have a streaming connection open. - // Streaming requests are POSTs (to help prevent the possibility of caching) that carry the - // following parameters: - // command - Indicating a custom command for the endpoint; currently 'open' to request a new - // streaming connection be opened, and 'close' to request the streaming connection - // to close. - // version - Indicates the streaming connection 'version' to use; it's here for backward comp. support - // if we need to change how commands are handled in a future product release. - // DSId - The FlexClient id value that uniquely identifies the swf making the request. - String command = req.getParameter(COMMAND_PARAM_NAME); - - // Only HTTP 1.1 is supported, disallow HTTP 1.0. - if (req.getProtocol().equals(HTTP_1_0)) - { - if (Log.isError()) - log.error("Endpoint with id '" + getId() + "' cannot service the streaming request made with " + - " HTTP 1.0. Only HTTP 1.1 is supported."); - - try - { - // Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (bad command). - res.sendError(HttpServletResponse.SC_BAD_REQUEST); - } - catch (IOException ignore) - {} - return; // Abort further server processing. - } - - if (!(command.equals(OPEN_COMMAND) || command.equals(CLOSE_COMMAND))) - { - if (Log.isError()) - log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as the supplied command '" - + command + "' is invalid."); - - try - { - // Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (bad command). - res.sendError(HttpServletResponse.SC_BAD_REQUEST); - } - catch (IOException ignore) - {} - return; // Abort further server processing. - } - - String flexClientId = req.getParameter(Message.FLEX_CLIENT_ID_HEADER); - if (flexClientId == null) - { - if (Log.isError()) - log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as no FlexClient id" - + " has been supplied in the request."); - - try - { - // Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (missing id). - res.sendError(HttpServletResponse.SC_BAD_REQUEST); - } - catch (IOException ignore) - {} - return; // Abort further server processing. - } - - // Validate that the provided FlexClient id exists and is associated with the current session. - // We don't do this validation with CLOSE_COMMAND because CLOSE_COMMAND can come in on a - // different session. For example, when the session expires due to timeout, the streaming client - // using that session sends a CLOSE_COMMAND on a new session to let the server know to clean client's - // corresponding server constructs. In that case, server already knows that session has expired so - // we can simply omit this validation. - FlexClient flexClient = null; - List<FlexClient> flexClients = FlexContext.getFlexSession().getFlexClients(); - boolean validFlexClientId = false; - for (Iterator<FlexClient> iter = flexClients.iterator(); iter.hasNext();) - { - flexClient = iter.next(); - if (flexClient.getId().equals(flexClientId) && flexClient.isValid()) - { - validFlexClientId = true; - break; - } - } - if (!command.equals(CLOSE_COMMAND) && !validFlexClientId) - { - if (Log.isError()) - log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as either the supplied" - + " FlexClient id '" + flexClientId + " is not valid, or the FlexClient with that id is not valid."); - - try - { - // Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (invalid id). - res.sendError(HttpServletResponse.SC_BAD_REQUEST); - } - catch (IOException ignore) - {} - return; // Abort further server processing. - } - - // If a close command is received and we don't have any flex clients registered simply invalidate - // the Flex Session. This will take care of the Flex Session that got created when the MB servlet - // was processing the CLOSE request. - if (command.equals(CLOSE_COMMAND) && flexClients.size() == 0) - { - FlexSession flexSession = FlexContext.getFlexSession(); - if (flexSession instanceof HttpFlexSession) - { - ((HttpFlexSession)flexSession).invalidate(false); - } - return; - } - - if (flexClient != null) - { - if (command.equals(OPEN_COMMAND)) - handleFlexClientStreamingOpenRequest(req, res, flexClient); - else if (command.equals(CLOSE_COMMAND)) - handleFlexClientStreamingCloseRequest(req, res, flexClient, req.getParameter(STREAM_ID_PARAM_NAME)); - } - } - - /** - * Helper method to write a chunk of bytes to the output stream in an HTTP - * "Transfer-Encoding: chunked" format. - * If the bytes array is null or empty, a terminal chunk will be written to - * signal the end of the response. - * Once the chunk is written to the output stream, the stream will be flushed immediately (no buffering). - * - * @param bytes The array of bytes to write as a chunk in the response; or if null, the signal to write the final chunk to complete the response. - * @param os The output stream the chunk will be written to. - * @param response The HttpServletResponse, used to flush the chunk to the client. - * - * @throws IOException if writing the chunk to the output stream fails. - */ - protected void streamChunk(byte[] bytes, ServletOutputStream os, HttpServletResponse response) throws IOException - { - if ((bytes != null) && (bytes.length > 0)) - { - byte[] chunkLength = Integer.toHexString(bytes.length).getBytes("ASCII"); - os.write(chunkLength); - os.write(CRLF_BYTES); - os.write(bytes); - os.write(CRLF_BYTES); - response.flushBuffer(); - } - else // Send final 'EOF' chunk for the response. - { - os.write(ZERO_BYTE); - os.write(CRLF_BYTES); - response.flushBuffer(); - } - } - - /** - * Helper method invoked by the endpoint request handler thread cycling in wait-notify. - * Serializes messages and streams each to the client as a response chunk using streamChunk(). - * - * @param messages The messages to serialize and push to the client. - * @param os The output stream the chunk will be written to. - * @param response The HttpServletResponse, used to flush the chunk to the client. - */ - protected abstract void streamMessages(List messages, ServletOutputStream os, HttpServletResponse response) throws IOException; - - /** - * Given a message, returns the MessagePerformanceInfo object if the message - * performance gathering is enabled, returns null otherwise. - * - * @param message The message. - * @return MessagePerformanceInfo if the message performance gathering is enabled, - * null otherwise. - */ - protected MessagePerformanceInfo getMPI(Message message) - { - return (isRecordMessageSizes() || isRecordMessageTimes())? - MessagePerformanceUtils.getMPII(message) : null; - } - - //-------------------------------------------------------------------------- - // - // Private methods. - // - //-------------------------------------------------------------------------- - - /** - * Utility method used at EndpointPushNotifier construction to monitor it for timeout. - * - * @param notifier The EndpointPushNotifier to monitor. - */ - private void monitorTimeout(EndpointPushNotifier notifier) - { - if (pushNotifierTimeoutManager != null) - pushNotifierTimeoutManager.scheduleTimeout(notifier); - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/DuplicateSessionException.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/endpoints/DuplicateSessionException.java b/core/src/flex/messaging/endpoints/DuplicateSessionException.java deleted file mode 100644 index 2cc6b0f..0000000 --- a/core/src/flex/messaging/endpoints/DuplicateSessionException.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.endpoints; - -import flex.messaging.MessageException; -import flex.messaging.log.LogEvent; - -/** - * Exception class used to indicate duplicate client sessions were detected. - */ -public class DuplicateSessionException extends MessageException -{ - /** - * - */ - public static final String DUPLICATE_SESSION_DETECTED_CODE = "Server.Processing.DuplicateSessionDetected"; - - /** - * - */ - private static final long serialVersionUID = -741704726700619666L; - - //-------------------------------------------------------------------------- - // - // Constructors - // - //-------------------------------------------------------------------------- - - /** - * Default constructor. - * Sets the code to a default value of <code>DUPLICATE_SESSION_DETECTED_CODE</code>. - */ - public DuplicateSessionException() - { - setCode(DUPLICATE_SESSION_DETECTED_CODE); - } - - //-------------------------------------------------------------------------- - // - // Properties - // - //-------------------------------------------------------------------------- - - //---------------------------------- - // preferredLogLevel - //---------------------------------- - - /** - * Override to log at the DEBUG level. - */ - @Override public short getPreferredLogLevel() - { - return LogEvent.DEBUG; - } - - //---------------------------------- - // logStackTraceEnabled - //---------------------------------- - - /** - * Override to suppress stack trace logging. - */ - @Override public boolean isLogStackTraceEnabled() - { - return false; - } -}
