http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java 
b/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
deleted file mode 100644
index 0353354..0000000
--- a/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints;
-
-import flex.messaging.FlexContext;
-import flex.messaging.FlexSession;
-import flex.messaging.client.FlexClient;
-import flex.messaging.client.FlushResult;
-import flex.messaging.client.PollFlushResult;
-import flex.messaging.client.PollWaitListener;
-import flex.messaging.client.UserAgentSettings;
-import flex.messaging.config.ConfigMap;
-import flex.messaging.config.ConfigurationConstants;
-import flex.messaging.log.Log;
-import flex.messaging.messages.CommandMessage;
-import flex.messaging.util.UserAgentManager;
-
-import java.util.Enumeration;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Base class for HTTP-based endpoints that support regular polling and long 
polling,
- * which means placing request threads that are polling for messages into a 
wait
- * state until messages are available for delivery or the configurable wait 
interval
- * is reached.
- */
-public abstract class BasePollingHTTPEndpoint extends BaseHTTPEndpoint 
implements PollWaitListener
-{
-    
//--------------------------------------------------------------------------
-    //
-    // Private Static Constants
-    //
-    
//--------------------------------------------------------------------------
-
-    private static final String POLLING_ENABLED = "polling-enabled";
-    private static final String POLLING_INTERVAL_MILLIS = 
"polling-interval-millis";
-    private static final String POLLING_INTERVAL_SECONDS = 
"polling-interval-seconds"; // Deprecated configuration option.
-    private static final String MAX_WAITING_POLL_REQUESTS = 
"max-waiting-poll-requests";
-    private static final String WAIT_INTERVAL_MILLIS = "wait-interval-millis";
-    private static final String CLIENT_WAIT_INTERVAL_MILLIS = 
"client-wait-interval-millis";
-    // Force clients that exceed the long-poll limit to wait at least this 
long between poll requests.
-    // This matches the default polling interval defined in the client 
PollingChannel.
-    private static final int DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS = 3000;
-
-    // User Agent based settings manager
-    private UserAgentManager userAgentManager = new UserAgentManager();
-
-    
//--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Constructs an unmanaged <code>BasePollingHTTPEndpoint</code>.
-     */
-    public BasePollingHTTPEndpoint()
-    {
-        this(false);
-    }
-
-    /**
-     * Constructs an <code>BasePollingHTTPEndpoint</code> with the indicated 
management.
-     *
-     * @param enableManagement <code>true</code> if the 
<code>BasePollingHTTPEndpoint</code>
-     * is manageable; <code>false</code> otherwise.
-     */
-    public BasePollingHTTPEndpoint(boolean enableManagement)
-    {
-        super(enableManagement);
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Initialize, validate, start, and stop methods.
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Initializes the <code>Endpoint</code> with the properties.
-     * If subclasses override this method, they must call 
<code>super.initialize()</code>.
-     *
-     * @param id The ID of the <code>Endpoint</code>.
-     * @param properties Properties for the <code>Endpoint</code>.
-     */
-    @Override
-    public void initialize(String id, ConfigMap properties)
-    {
-        super.initialize(id, properties);
-
-        if (properties == null || properties.size() == 0)
-        {
-            // Initialize default user agent manager settings.
-            UserAgentManager.setupUserAgentManager(null, userAgentManager);
-
-            return; // Nothing else to initialize.
-        }
-
-        // General poll props.
-        pollingEnabled = properties.getPropertyAsBoolean(POLLING_ENABLED, 
false);
-        pollingIntervalMillis = 
properties.getPropertyAsLong(POLLING_INTERVAL_MILLIS, -1);
-        long pollingIntervalSeconds = 
properties.getPropertyAsLong(POLLING_INTERVAL_SECONDS, -1); // Deprecated
-        if (pollingIntervalSeconds > -1)
-            pollingIntervalMillis = pollingIntervalSeconds * 1000;
-
-        // Piggybacking props.
-        piggybackingEnabled = 
properties.getPropertyAsBoolean(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT,
 false);
-
-        // HTTP poll wait props.
-        maxWaitingPollRequests = 
properties.getPropertyAsInt(MAX_WAITING_POLL_REQUESTS, 0);
-        waitInterval = properties.getPropertyAsLong(WAIT_INTERVAL_MILLIS, 0);
-        clientWaitInterval = 
properties.getPropertyAsInt(CLIENT_WAIT_INTERVAL_MILLIS, 0);
-
-        // User Agent props.
-        UserAgentManager.setupUserAgentManager(properties, userAgentManager);
-
-        // Set initial state for the canWait flag based on whether we allow 
waits or not.
-        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval 
> 0))
-        {
-            waitEnabled = true;
-            canWait = true;
-        }
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Variables
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * This flag is volatile to allow for consistent reads across thread 
without
-     * needing to pay the cost for a synchronized lock for each read.
-     */
-    private volatile boolean canWait;
-
-    /**
-     * Used to synchronize sets and gets to the number of waiting clients.
-     */
-    protected final Object lock = new Object();
-
-    /**
-     * Set when properties are handled; used as a shortcut for logging to 
determine whether this
-     * instance attempts to put request threads in a wait state or not.
-     */
-    private boolean waitEnabled;
-
-    /**
-     * A count of the number of request threads that are currently in the wait 
state (including
-     * those on their way into or out of it).
-     */
-    protected int waitingPollRequestsCount;
-
-    /**
-     * A Map(notification Object for a waited request thread, Boolean.TRUE).
-     */
-    private ConcurrentHashMap currentWaitedRequests;
-
-    
//--------------------------------------------------------------------------
-    //
-    // Properties
-    //
-    
//--------------------------------------------------------------------------
-
-    //----------------------------------
-    //  clientWaitInterval
-    //----------------------------------
-
-    protected int clientWaitInterval = 0;
-
-    /**
-     * Retrieves the number of milliseconds the client will wait after 
receiving a response for
-     * a poll with server wait before it issues its next poll request.
-     * A value of zero or less causes the client to use its default polling 
interval (based on the
-     * channel's polling-interval-millis configuration) and this value is 
ignored.
-     * A value greater than zero will cause the client to wait for the 
specified interval before
-     * issuing its next poll request with a value of 1 triggering an immediate 
poll from the client
-     * as soon as a waited poll response is received.
-     * @return The client wait interval.
-     */
-    public int getClientWaitInterval()
-    {
-        return clientWaitInterval;
-    }
-
-    /**
-     * Sets the number of milliseconds a client will wait after receiving a 
response for a poll
-     * with server wait before it issues its next poll request.
-     * A value of zero or less causes the client to use its default polling 
interval (based on the
-     * channel's polling-interval-millis configuration) and this value is 
ignored.
-     * A value greater than zero will cause the client to wait for the 
specified interval before
-     * issuing its next poll request with a value of 1 triggering an immediate 
poll from the client
-     * as soon as a waited poll response is received.
-     * This property does not effect polling clients that poll the server 
without a server wait.
-     *
-     * @param value The number of milliseconds a client will wait before 
issuing its next poll when the
-     *        server is configured to wait.
-     */
-    public void setClientWaitInterval(int value)
-    {
-        clientWaitInterval = value;
-    }
-
-    //----------------------------------
-    //  maxWaitingPollRequests
-    //----------------------------------
-
-    protected int maxWaitingPollRequests = 0;
-
-    /**
-     * Retrieves the maximum number of server poll response threads that will 
be
-     * waiting for messages to arrive for clients.
-     * @return The maximum number of waiting poll requests.
-     */
-    public int getMaxWaitingPollRequests()
-    {
-        return maxWaitingPollRequests;
-    }
-
-    /**
-     * Sets the maximum number of server poll response threads that will be
-     * waiting for messages to arrive for clients.
-     *
-     * @param maxWaitingPollRequests The maximum number of server poll 
response threads
-     * that will be waiting for messages to arrive for the client.
-     */
-    public void setMaxWaitingPollRequests(int maxWaitingPollRequests)
-    {
-        this.maxWaitingPollRequests = maxWaitingPollRequests;
-        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval 
> 0))
-        {
-            waitEnabled = true;
-            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
-        }
-    }
-
-    //----------------------------------
-    //  pollingEnabled
-    //----------------------------------
-
-    /**
-     *
-     * This is a property used on the client.
-     */
-    protected boolean piggybackingEnabled;
-
-    //----------------------------------
-    //  pollingEnabled
-    //----------------------------------
-
-    /**
-     *
-     * This is a property used on the client.
-     */
-    protected boolean pollingEnabled;
-
-    //----------------------------------
-    //  pollingIntervalMillis
-    //----------------------------------
-
-    /**
-     *
-     * This is a property used on the client.
-     */
-    protected long pollingIntervalMillis = -1;
-
-    //----------------------------------
-    //  waitInterval
-    //----------------------------------
-
-    protected long waitInterval = 0;
-
-    /**
-     * Retrieves the number of milliseconds the server poll response thread 
will be
-     * waiting for messages to arrive for the client.
-     * @return The wait interval.
-     */
-    public long getWaitInterval()
-    {
-        return waitInterval;
-    }
-
-    /**
-     * Sets the number of milliseconds the server poll response thread will be
-     * waiting for messages to arrive for the client.
-     *
-     * @param waitInterval The number of milliseconds the server poll response 
thread will be
-     * waiting for messages to arrive for the client.
-     */
-    public void setWaitInterval(long waitInterval)
-    {
-        this.waitInterval = waitInterval;
-        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval 
> 0))
-        {
-            waitEnabled = true;
-            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
-        }
-    }
-
-    //----------------------------------
-    //  waitingPollRequestsCount
-    //----------------------------------
-
-    /**
-     * Retrieves the count of the number of request threads that are currently 
in the wait state
-     * (including those on their way into or out of it).
-     *
-     * @return The count of the number of request threads that are currently 
in the wait state.
-     */
-    public int getWaitingPollRequestsCount()
-    {
-        return waitingPollRequestsCount;
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Public Methods
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     *
-     * Returns a <code>ConfigMap</code> of endpoint properties that the client
-     * needs. This includes properties from <code>super.describeEndpoint</code>
-     * and additional <code>BaseHTTPEndpoint</code> specific properties under
-     * "properties" key.
-     */
-    @Override
-    public ConfigMap describeEndpoint()
-    {
-        ConfigMap endpointConfig = super.describeEndpoint();
-
-        boolean createdProperties = false;
-        ConfigMap properties = 
endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
-        if (properties == null)
-        {
-            properties = new ConfigMap();
-            createdProperties = true;
-        }
-
-        if (pollingEnabled)
-        {
-            ConfigMap pollingEnabled = new ConfigMap();
-            // Adding as a value rather than attribute to the parent
-            pollingEnabled.addProperty(EMPTY_STRING, TRUE_STRING);
-            properties.addProperty(POLLING_ENABLED, pollingEnabled);
-        }
-
-        if (pollingIntervalMillis > -1)
-        {
-            ConfigMap pollingInterval = new ConfigMap();
-            // Adding as a value rather than attribute to the parent
-            pollingInterval.addProperty(EMPTY_STRING, 
String.valueOf(pollingIntervalMillis));
-            properties.addProperty(POLLING_INTERVAL_MILLIS, pollingInterval);
-        }
-
-        if (piggybackingEnabled)
-        {
-            ConfigMap piggybackingEnabled = new ConfigMap();
-            // Adding as a value rather than attribute to the parent
-            piggybackingEnabled.addProperty(EMPTY_STRING, 
String.valueOf(piggybackingEnabled));
-            
properties.addProperty(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, 
piggybackingEnabled);
-        }
-
-        if (createdProperties && properties.size() > 0)
-            
endpointConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, 
properties);
-
-        return endpointConfig;
-    }
-
-    /**
-     * Sets up monitoring of waited poll requests so they can be notified and 
exit when the
-     * endpoint stops.
-     *
-     * @see flex.messaging.endpoints.AbstractEndpoint#start()
-     */
-    @Override
-    public void start()
-    {
-        if (isStarted())
-            return;
-
-        super.start();
-
-        currentWaitedRequests = new ConcurrentHashMap();
-    }
-
-    /**
-     * Ensures that no poll requests in a wait state are left un-notified when 
the endpoint stops.
-     *
-     * @see flex.messaging.endpoints.AbstractEndpoint#stop()
-     */
-    @Override
-    public void stop()
-    {
-        if (!isStarted())
-            return;
-
-        // Notify any currently waiting polls.
-        Enumeration keys = currentWaitedRequests.keys();
-        while (keys.hasMoreElements())
-        {
-            Object notifier = keys.nextElement();
-            synchronized (notifier)
-            {
-                notifier.notifyAll(); // Break any current waits.
-            }
-        }
-        currentWaitedRequests = null;
-
-        super.stop();
-    }
-
-    /**
-     * (non-Javaodc)
-     * @see flex.messaging.client.PollWaitListener#waitStart(Object)
-     */
-    public void waitStart(Object notifier)
-    {
-        currentWaitedRequests.put(notifier, Boolean.TRUE);
-    }
-
-    /**
-     * (non-Javaodc)
-     * @see flex.messaging.client.PollWaitListener#waitEnd(Object)
-     */
-    public void waitEnd(Object notifier)
-    {
-        if (currentWaitedRequests != null)
-            currentWaitedRequests.remove(notifier);
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Protected Methods
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Overrides the base poll handling to support optionally putting Http 
request handling threads
-     * into a wait state until messages are available to be delivered in the 
poll response or a timeout is reached.
-     * The number of threads that may be put in a wait state is bounded by 
<code>max-waiting-poll-requests</code>
-     * and waits will only be attempted if the canWait flag that is based on 
the <code>max-waiting-poll-requests</code>
-     * and the specified <code>wait-interval</code> is true.
-     *
-     * @param flexClient The FlexClient that issued the poll request.
-     * @param pollCommand The poll command from the client.
-     * @return The flush info used to build the poll response.
-     */
-    @Override
-    protected FlushResult handleFlexClientPoll(FlexClient flexClient, 
CommandMessage pollCommand)
-    {
-        FlushResult flushResult = null;
-        if (canWait && 
!pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
-        {
-            FlexSession session = FlexContext.getFlexSession();
-            // If canWait is true it means we currently have less than the max 
number of allowed waiting threads.
-
-            // We need to protect writes/reads to the wait count with the 
endpoint's lock.
-            // Also, we have to be careful to handle the case where two 
threads get to this point when only
-            // one wait spot remains; one thread will win and the other needs 
to revert to a non-waitable poll.
-            boolean thisThreadCanWait;
-            synchronized (lock)
-            {
-                ++waitingPollRequestsCount;
-                if (waitingPollRequestsCount == maxWaitingPollRequests)
-                {
-                    thisThreadCanWait = true; // This thread got the last wait 
spot.
-                    canWait = false;
-                }
-                else if (waitingPollRequestsCount > maxWaitingPollRequests)
-                {
-                    thisThreadCanWait = false; // This thread was beaten out 
for the last spot.
-                    --waitingPollRequestsCount; // Decrement the count because 
we're not going to try a poll with wait.
-                    canWait = false; // All the wait spots are currently 
occupied so prevent further attempts for now.
-                }
-                else
-                {
-                    // We haven't hit the limit yet, allow this thread to wait.
-                    thisThreadCanWait = true;
-                }
-            }
-
-            // Check the max waiting connections per session count
-            if (thisThreadCanWait)
-            {
-                String userAgentValue = 
FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
-                UserAgentSettings agentSettings = 
userAgentManager.match(userAgentValue);
-                synchronized(session)
-                {
-                    if (agentSettings != null)
-                        session.maxConnectionsPerSession = 
agentSettings.getMaxPersistentConnectionsPerSession();
-
-                    ++session.streamingConnectionsCount;
-                    if (session.maxConnectionsPerSession == 
FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
-                            || session.streamingConnectionsCount <= 
session.maxConnectionsPerSession)
-                    {
-                        thisThreadCanWait = true; // We haven't hit the limit 
yet, allow the wait.
-                    }
-                    else // (session.streamingConnectionsCount > 
session.maxConnectionsPerSession)
-                    {
-                        thisThreadCanWait = false; // no more from this client
-                        --session.streamingConnectionsCount;
-                    }
-                }
-
-                if (!thisThreadCanWait)
-                {
-                    // Decrement the waiting poll count, since this poll isn't 
going to wait.
-                    synchronized (lock)
-                    {
-                        --waitingPollRequestsCount;
-                        if (waitingPollRequestsCount < maxWaitingPollRequests)
-                            canWait = true;
-                    }
-                    if (Log.isDebug())
-                    {
-                        log.debug("Max long-polling requests per session limit 
(" + session.maxConnectionsPerSession + ") has been reached, this poll won't 
wait.");
-                    }
-                }
-
-            }
-
-            if (thisThreadCanWait)
-            {
-                if (Log.isDebug())
-                    log.debug("Number of waiting threads for endpoint with id 
'"+ getId() +"' is " + waitingPollRequestsCount + ".");
-
-                try
-                {
-                    flushResult  = flexClient.pollWithWait(getId(), 
FlexContext.getFlexSession(), this, waitInterval);
-                    if (flushResult != null)
-                    {
-                        // Prevent busy-polling due to multiple clients 
sharing a session and swapping each other out too quickly.
-                        if ((flushResult instanceof PollFlushResult) && 
((PollFlushResult)flushResult).isAvoidBusyPolling() && 
(flushResult.getNextFlushWaitTimeMillis() < 
DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
-                        {
-                            // Force the client polling interval to match the 
default defined in the client PollingChannel.
-                            
flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
-                        }
-                        else if ((clientWaitInterval > 0) && 
(flushResult.getNextFlushWaitTimeMillis() == 0))
-                        {
-                            // If the FlushResult doesn't specify it's own 
flush wait time, use the configured clientWaitInterval if defined.
-                            
flushResult.setNextFlushWaitTimeMillis(clientWaitInterval);
-                        }
-                    }
-                }
-                finally
-                {
-                    // We're done waiting so decrement the count of waiting 
threads and update the canWait flag if necessary
-                    synchronized (lock)
-                    {
-                        --waitingPollRequestsCount;
-                        if (waitingPollRequestsCount < maxWaitingPollRequests)
-                            canWait = true;
-                    }
-                    synchronized (session)
-                    {
-                        --session.streamingConnectionsCount;
-                    }
-
-                    if (Log.isDebug())
-                        log.debug("Number of waiting threads for endpoint with 
id '"+ getId() +"' is " + waitingPollRequestsCount + ".");
-                }
-            }
-        }
-        else if (Log.isDebug() && waitEnabled)
-        {
-            if 
(pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
-                log.debug("Suppressing poll wait for this request because it 
is part of a batch of messages to process.");
-            else
-                log.debug("Max waiting poll requests limit '" + 
maxWaitingPollRequests + "' has been reached for endpoint '" + getId() + "'. 
FlexClient with id '"+ flexClient.getId() + "' will poll with no wait.");
-        }
-
-        // If we weren't able to do a poll with wait above for any reason just 
run the base poll handling logic.
-        if (flushResult == null)
-        {
-            flushResult = super.handleFlexClientPoll(flexClient, pollCommand);
-            // If this is an excess poll request that we couldn't wait on, 
make sure the client doesn't poll the endpoint too aggressively.
-            // In this case, force a client wait to match the default polling 
interval defined in the client PollingChannel.
-            if ( waitEnabled && (pollingIntervalMillis < 
DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
-            {
-                if (flushResult == null)
-                {
-                    flushResult = new FlushResult();
-                }
-                
flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
-            }
-        }
-
-        return flushResult;
-    }
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java 
b/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java
deleted file mode 100644
index 7aa7328..0000000
--- a/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java
+++ /dev/null
@@ -1,1226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints;
-
-import flex.messaging.FlexContext;
-import flex.messaging.FlexSession;
-import flex.messaging.HttpFlexSession;
-import flex.messaging.MessageException;
-import flex.messaging.client.EndpointPushNotifier;
-import flex.messaging.client.FlexClient;
-import flex.messaging.client.FlushResult;
-import flex.messaging.client.UserAgentSettings;
-import flex.messaging.config.ConfigMap;
-import flex.messaging.log.Log;
-import flex.messaging.messages.AcknowledgeMessage;
-import flex.messaging.messages.AsyncMessage;
-import flex.messaging.messages.CommandMessage;
-import flex.messaging.messages.Message;
-import flex.messaging.messages.MessagePerformanceInfo;
-import flex.messaging.messages.MessagePerformanceUtils;
-import flex.messaging.util.TimeoutManager;
-import flex.messaging.util.UserAgentManager;
-
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Base class for HTTP-based endpoints that support streaming HTTP connections 
to
- * connected clients.
- * Each streaming connection managed by this endpoint consumes one of the 
request
- * handler threads provided by the servlet container, so it is not highly 
scalable
- * but offers performance advantages over client polling for clients receiving
- * a steady, rapid stream of pushed messages.
- * This endpoint does not support polling clients and will fault any poll 
requests
- * that are received. To support polling clients use subclasses of
- * BaseHTTPEndpoint instead.
- */
-public abstract class BaseStreamingHTTPEndpoint extends BaseHTTPEndpoint
-{
-    
//--------------------------------------------------------------------------
-    //
-    // Private Static Constants
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * This token is used in chunked HTTP responses frequently so initialize 
it statically for general use.
-     */
-    private static final byte[] CRLF_BYTES = {(byte)13, (byte)10};
-
-    /**
-     * This token is used for the terminal chunk within a chunked response.
-     */
-    private static final byte ZERO_BYTE = (byte)48;
-
-    /**
-     * This token is used to signal that a chunk of data should be skipped by 
the client.
-     */
-    private static final byte NULL_BYTE = (byte)0;
-
-    /**
-     * Parameter name for 'command' passed in a request for a new streaming 
connection.
-     */
-    private static final String COMMAND_PARAM_NAME = "command";
-
-    /**
-     * This is the token at the end of the HTTP request line that indicates 
that it is
-     * a stream connection that should be held open to push data back to the 
client,
-     * as opposed to a regular request-response message.
-     */
-    private static final String OPEN_COMMAND = "open";
-
-    /**
-     * This is the token at the end of the HTTP request line that indicates 
that it is
-     * a stream connection that should be closed.
-     */
-    private static final String CLOSE_COMMAND = "close";
-
-    /**
-     *  Parameter name for the stream ID; passed with commands for an existing 
streaming connection.
-     */
-    private static final String STREAM_ID_PARAM_NAME = "streamId";
-
-    /**
-     * Constant for HTTP/1.0.
-     */
-    private static final String HTTP_1_0 = "HTTP/1.0";
-
-    /**
-     * Thread name suffix for request threads that are servicing a pinned open 
streaming connection.
-     */
-    private static final String STREAMING_THREAD_NAME_EXTENSION = 
"-in-streaming-mode";
-
-    /**
-     * Configuration constants.
-     */
-    private static final String PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES = 
"connection-idle-timeout-minutes";
-    private static final String 
PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES = "idle-timeout-minutes";
-    private static final String MAX_STREAMING_CLIENTS = 
"max-streaming-clients";
-    private static final String SERVER_TO_CLIENT_HEARTBEAT_MILLIS = 
"server-to-client-heartbeat-millis";
-    private static final String 
PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = 
"invalidate-messageclient-on-streaming-close";
-
-    /**
-     * Defaults.
-     */
-    private static final boolean 
DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = false;
-    private static final int DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS = 5000;
-    private static final int DEFAULT_MAX_STREAMING_CLIENTS = 10;
-
-    /**
-     * Errors.
-     */
-    public static final String POLL_NOT_SUPPORTED_CODE = 
"Server.PollNotSupported";
-    public static final int POLL_NOT_SUPPORTED_MESSAGE = 10034;
-
-
-    
//--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Constructs an unmanaged <code>BaseStreamingHTTPEndpoint</code>.
-     */
-    public BaseStreamingHTTPEndpoint()
-    {
-        this(false);
-    }
-
-    /**
-     * Constructs an <code>BaseStreamingHTTPEndpoint</code> with the indicated 
management.
-     *
-     * @param enableManagement <code>true</code> if the 
<code>BaseStreamingHTTPEndpoint</code>
-     * is manageable; <code>false</code> otherwise.
-     */
-    public BaseStreamingHTTPEndpoint(boolean enableManagement)
-    {
-        super(enableManagement);
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Initialize, validate, start, and stop methods.
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Initializes the <code>Endpoint</code> with the properties.
-     * If subclasses override this method, they must call 
<code>super.initialize()</code>.
-     *
-     * @param id The ID of the <code>Endpoint</code>.
-     * @param properties Properties for the <code>Endpoint</code>.
-     */
-    @Override
-    public void initialize(String id, ConfigMap properties)
-    {
-        super.initialize(id, properties);
-
-        if (properties == null || properties.size() == 0)
-        {
-            // Initialize default user agent manager settings.
-            UserAgentManager.setupUserAgentManager(null, userAgentManager);
-
-            return; // Nothing else to initialize.
-        }
-
-        // The interval that the server will check if the client is still 
available.
-        serverToClientHeartbeatMillis = 
properties.getPropertyAsLong(SERVER_TO_CLIENT_HEARTBEAT_MILLIS, 
DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS);
-        setServerToClientHeartbeatMillis(serverToClientHeartbeatMillis);
-
-        
setInvalidateMessageClientOnStreamingClose(properties.getPropertyAsBoolean(PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE,
 DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE));
-
-        // Number of minutes a client can remain idle before the server times 
the connection out.
-        int connectionIdleTimeoutMinutes = 
properties.getPropertyAsInt(PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES, 
getConnectionIdleTimeoutMinutes());
-        if (connectionIdleTimeoutMinutes != 0)
-        {
-            setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
-        }
-        else
-        {
-            connectionIdleTimeoutMinutes = 
properties.getPropertyAsInt(PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES, 
getConnectionIdleTimeoutMinutes());
-            if (connectionIdleTimeoutMinutes != 0)
-                setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
-        }
-
-        // User-agent configuration for kick-start bytes and max streaming 
connections per session.
-        UserAgentManager.setupUserAgentManager(properties, userAgentManager);
-
-        // Maximum number of clients allowed to have streaming HTTP 
connections with the endpoint.
-        maxStreamingClients = 
properties.getPropertyAsInt(MAX_STREAMING_CLIENTS, 
DEFAULT_MAX_STREAMING_CLIENTS);
-
-        // Set initial state for the canWait flag based on whether we allow 
waits or not.
-        canStream = (maxStreamingClients > 0);
-    }
-
-
-    @Override
-    public void start()
-    {
-        if (isStarted())
-            return;
-
-        super.start();
-
-        if (connectionIdleTimeoutMinutes > 0)
-        {
-            pushNotifierTimeoutManager = new TimeoutManager(new ThreadFactory()
-                                                            {
-                                                                int counter = 
1;
-                                                                public 
synchronized Thread newThread(Runnable runnable)
-                                                                {
-                                                                    Thread t = 
new Thread(runnable);
-                                                                    
t.setName(getId() + "-StreamingConnectionTimeoutThread-" + counter++);
-                                                                    return t;
-                                                                }
-                                                            });
-        }
-
-        currentStreamingRequests = new ConcurrentHashMap<String, 
EndpointPushNotifier>();
-    }
-
-    /**
-     * (non-JavaDoc)
-     * @see flex.messaging.endpoints.AbstractEndpoint#stop()
-     */
-    @Override
-    public void stop()
-    {
-        if (!isStarted())
-            return;
-
-        // Shutdown the timeout manager for streaming connections cleanly.
-        if (pushNotifierTimeoutManager != null)
-        {
-            pushNotifierTimeoutManager.shutdown();
-            pushNotifierTimeoutManager = null;
-        }
-
-        // Shutdown any currently open streaming connections.
-        for (EndpointPushNotifier notifier : currentStreamingRequests.values())
-            notifier.close();
-
-        currentStreamingRequests = null;
-
-        super.stop();
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Variables
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Used to synchronize sets and gets to the number of streaming clients.
-     */
-    protected final Object lock = new Object();
-
-    /**
-     * Used to keep track of the mapping between user agent match strings and
-     * the bytes needed to kickstart their streaming connections.
-     */
-    protected UserAgentManager userAgentManager = new UserAgentManager();
-
-    /**
-     * This flag is volatile to allow for consistent reads across thread 
without
-     * needing to pay the cost for a synchronized lock for each read.
-     */
-    private volatile boolean canStream = true;
-
-    /**
-     * Manages timing out EndpointPushNotifier instances.
-     */
-    private volatile TimeoutManager pushNotifierTimeoutManager;
-
-    /**
-     * A Map(EndpointPushNotifier, Boolean.TRUE) containing all currently open 
streaming notifiers
-     * for this endpoint.
-     * Used for clean shutdown.
-     */
-    private ConcurrentHashMap<String, EndpointPushNotifier> 
currentStreamingRequests;
-
-    
//--------------------------------------------------------------------------
-    //
-    // Properties
-    //
-    
//--------------------------------------------------------------------------
-
-    //------------------------------------------
-    //  invalidateMessageClientOnStreamingClose
-    //-----------------------------------------
-
-    private volatile boolean invalidateMessageClientOnStreamingClose = 
DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE;
-
-    /**
-     * Returns whether invalidate-messageclient-on-streaming-close is enabled.
-     * See {@link 
BaseStreamingHTTPEndpoint#setInvalidateMessageClientOnStreamingClose(boolean)}
-     * for details.
-     *
-     * @return <code>true</code> if the 
invalidate-messageclient-on-streaming-close is enabled, <code>false</code> 
otherwise.
-     */
-    public boolean isInvalidateMessageClientOnStreamingClose()
-    {
-        return invalidateMessageClientOnStreamingClose;
-    }
-
-    /**
-     * Sets the invalidate-messageclient-on-streaming close property. If 
enabled,
-     * when the streaming connection is closed for whatever reason (for 
example, the client is gone),
-     * the client's associated MessageClient on the server is invalidated 
immediately.
-     * This is useful in scenarios where there is a constant stream of 
messages, the client is gone,
-     * and the streaming connection is closed, but the session has not timed 
out on the server yet.
-     * In that case, enabling this property will prevent messages accumulating 
on the session on behalf
-     * of the MessageClient that will invalidate.
-     * <p>
-     * Important: Do not enable this property when reliable messaging is used, 
otherwise
-     * reliable reconnect attempts will not happen correctly.</p>
-     *
-     * @param value The property value.
-     */
-    public void setInvalidateMessageClientOnStreamingClose(boolean value)
-    {
-        invalidateMessageClientOnStreamingClose = value;
-    }
-
-    //----------------------------------
-    //  serverToClientHeartbeatMillis
-    //----------------------------------
-
-    private long serverToClientHeartbeatMillis = 
DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS;
-
-    /**
-     * Retrieves the number of milliseconds the server will wait before 
writing a
-     * single <code>null</code> byte to the streaming connection, to ensure 
the client is
-     * still available.
-     * @return The server-to-client heartbeat time in milliseconds.
-     */
-    public long getServerToClientHeartbeatMillis()
-    {
-        return serverToClientHeartbeatMillis;
-    }
-
-    /**
-     * Retrieves the number of milliseconds the server will wait before 
writing a
-     * single <code>null</code> byte to the streaming connection to ensure the 
client is
-     * still available when there are no new messages for the client.
-     * A non-positive value means the server will wait forever for new 
messages and
-     * it will not write the <code>null</code> byte to determine if the client 
is available.
-     * @param serverToClientHeartbeatMillis The server-to-client heartbeat 
time in milliseconds.
-     */
-    public void setServerToClientHeartbeatMillis(long 
serverToClientHeartbeatMillis)
-    {
-        if (serverToClientHeartbeatMillis < 0)
-            serverToClientHeartbeatMillis = 0;
-        this.serverToClientHeartbeatMillis = serverToClientHeartbeatMillis;
-    }
-
-    //----------------------------------
-    //  connectionIdleTimeoutMinutes
-    //----------------------------------
-
-    private int connectionIdleTimeoutMinutes = 0;
-
-    /**
-     * Retrieves the number of minutes a client can remain idle before the 
server
-     * times the connection out. The default value is 0, indicating that 
connections
-     * will not be timed out and must be closed by the client or server, 
either explicitly
-     * or by either process terminating.
-     *
-     * @return The number of minutes a client can remain idle before the server
-     * times the connection out.
-     */
-    public int getConnectionIdleTimeoutMinutes()
-    {
-        return connectionIdleTimeoutMinutes;
-    }
-
-    /**
-     * Sets the number of minutes a client can remain idle before the server
-     * times the connection out. A value of 0 or below indicates that
-     * connections will not be timed out.
-     *
-     * @param value The number of minutes a client can remain idle
-     * before the server times the connection out.
-     */
-    public void setConnectionIdleTimeoutMinutes(int value)
-    {
-        if (value < 0)
-            value = 0;
-
-        this.connectionIdleTimeoutMinutes = value;
-    }
-
-    /**
-     * (non-JavaDoc)
-     * @deprecated Use {@link 
BaseStreamingHTTPEndpoint#getConnectionIdleTimeoutMinutes()} instead.
-     */
-    public int getIdleTimeoutMinutes()
-    {
-        return getConnectionIdleTimeoutMinutes();
-    }
-
-    /**
-     * (non-JavaDoc)
-     * @deprecated Use {@link 
BaseStreamingHTTPEndpoint#setConnectionIdleTimeoutMinutes(int)} instead.
-     */
-    public void setIdleTimeoutMinutes(int value)
-    {
-        setConnectionIdleTimeoutMinutes(value);
-    }
-
-    //----------------------------------
-    //  maxStreamingClients
-    //----------------------------------
-
-    private int maxStreamingClients = DEFAULT_MAX_STREAMING_CLIENTS;
-
-    /**
-     * Retrieves the maximum number of clients that will be allowed to 
establish
-     * a streaming HTTP connection with the endpoint.
-     *
-     * @return The maximum number of clients that will be allowed to establish
-     * a streaming HTTP connection with the endpoint.
-     */
-    public int getMaxStreamingClients()
-    {
-        return maxStreamingClients;
-    }
-
-    /**
-     * Sets the maximum number of clients that will be allowed to establish
-     * a streaming HTTP connection with the server.
-     *
-     * @param maxStreamingClients The maximum number of clients that will be 
allowed
-     * to establish a streaming HTTP connection with the server.
-     */
-    public void setMaxStreamingClients(int maxStreamingClients)
-    {
-        this.maxStreamingClients = maxStreamingClients;
-        canStream = (streamingClientsCount < maxStreamingClients);
-    }
-
-    //----------------------------------
-    //  streamingClientsCount
-    //----------------------------------
-
-    protected int streamingClientsCount;
-
-    /**
-     * Retrieves the the number of clients that are currently in the streaming 
state.
-     *
-     * @return The number of clients that are currently in the streaming state.
-     */
-    public int getStreamingClientsCount()
-    {
-        return streamingClientsCount;
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Public Methods
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Handles HTTP requests targetting this endpoint.
-     * Two types or requests are supported. If the request is a regular 
request-response AMF/AMFX
-     * message it is handled by the base logic in BaseHTTPEndpoint.service. 
However, if it is a
-     * request to open a streaming HTTP connection to the client this endpoint 
performs some
-     * validation checks and then holds the connection open to stream data 
back to the client
-     * over.
-     *
-     * @param req The original servlet request.
-     * @param res The active servlet response.
-     */
-    @Override
-    public void service(HttpServletRequest req, HttpServletResponse res)
-    {
-        String command = req.getParameter(COMMAND_PARAM_NAME);
-        if (command != null)
-            serviceStreamingRequest(req, res);
-        else // Let BaseHTTPEndpoint logic handle regular request-response 
messaging.
-            super.service(req, res);
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Protected Methods
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * If the message has MPI enabled, this method adds all the needed 
performance
-     * headers to the message.
-     *
-     * @param message Message to add performance headers to.
-     */
-    protected void addPerformanceInfo(Message message)
-    {
-        MessagePerformanceInfo mpiOriginal = getMPI(message);
-        if (mpiOriginal == null)
-            return;
-
-        MessagePerformanceInfo mpip = 
(MessagePerformanceInfo)mpiOriginal.clone();
-        try
-        {
-            // Set the original message info as the pushed causer info.
-            MessagePerformanceUtils.setMPIP(message, mpip);
-            MessagePerformanceUtils.setMPII(message, null);
-        }
-        catch (Exception e)
-        {
-            if (Log.isDebug())
-                log.debug("MPI exception while streaming the message: " + 
e.toString());
-        }
-
-        // Overhead only used when MPI is enabled for sizing
-        MessagePerformanceInfo mpio = new MessagePerformanceInfo();
-        if (mpip.recordMessageTimes)
-        {
-            mpio.sendTime = System.currentTimeMillis();
-            mpio.infoType = "OUT";
-        }
-        mpio.pushedFlag = true;
-        MessagePerformanceUtils.setMPIO(message, mpio);
-
-        // If MPI sizing information is enabled serialize again so that we 
know size
-        if (mpip.recordMessageSizes)
-        {
-            try
-            {
-                // Each subclass serializes the message in their own format to
-                // get the message size for the MPIO.
-                long serializationOverhead = System.currentTimeMillis();
-                mpio.messageSize = getMessageSizeForPerformanceInfo(message);
-
-                // Set serialization overhead to the time calculated during 
serialization above
-                if (mpip.recordMessageTimes)
-                {
-                    serializationOverhead = System.currentTimeMillis() - 
serializationOverhead;
-                    mpip.addToOverhead(serializationOverhead);
-                    mpiOriginal.addToOverhead(serializationOverhead);
-                    mpio.sendTime = System.currentTimeMillis();
-                }
-            }
-            catch(Exception e)
-            {
-                log.debug("MPI exception while streaming the message: " + 
e.toString());
-            }
-        }
-    }
-
-    /**
-     * Utility method to convert streamed push messages to their small versions
-     * if the channel-endpoint combination supports small messages.
-     *
-     * @param message The regular message.
-     * @return The small message if the message has a small version, or 
regular message
-     * if it doesn't .
-     */
-    protected Message convertPushMessageToSmall(Message message)
-    {
-        FlexSession session = FlexContext.getFlexSession();
-        if (session != null && session.useSmallMessages())
-            return convertToSmallMessage(message);
-        return message;
-    }
-
-    /**
-     * Used internally for performance information gathering; not intended for
-     * public use. The default implementation of this method returns zero.
-     * Subclasses should overwrite if they want to accurately report message
-     * size information in performance information gathering.
-     *
-     * @param message Message to get the size for.
-     *
-     * @return The size of the message after message is serialized.
-     */
-    protected long getMessageSizeForPerformanceInfo(Message message)
-    {
-        return 0;
-    }
-
-    /**
-     * This streaming endpoint does not support polling clients.
-     *
-     * @param flexClient The FlexClient that issued the poll request.
-     * @param pollCommand The poll command from the client.
-     * @return The flush info used to build the poll response.
-     */
-    @Override
-    protected FlushResult handleFlexClientPoll(FlexClient flexClient, 
CommandMessage pollCommand)
-    {
-        MessageException me = new MessageException();
-        me.setMessage(POLL_NOT_SUPPORTED_MESSAGE);
-        me.setDetails(POLL_NOT_SUPPORTED_MESSAGE);
-        me.setCode(POLL_NOT_SUPPORTED_CODE);
-        throw me;
-    }
-
-    /**
-     * Handles streaming connection open command sent by the FlexClient.
-     *
-     * @param req The <code>HttpServletRequest</code> to service.
-     * @param res The <code>HttpServletResponse</code> to be used in case an 
error
-     * has to be sent back.
-     * @param flexClient FlexClient that requested the streaming connection.
-     */
-    protected void handleFlexClientStreamingOpenRequest(HttpServletRequest 
req, HttpServletResponse res, FlexClient flexClient)
-    {
-        FlexSession session = FlexContext.getFlexSession();
-        if (canStream && session.canStream)
-        {
-            // If canStream/session.canStream is true it means we currently 
have
-            // less than the max number of allowed streaming threads, per 
endpoint/session.
-
-            // We need to protect writes/reads to the stream count with the 
endpoint's lock.
-            // Also, we have to be careful to handle the case where two 
threads get to this point when only
-            // one streaming spot remains; one thread will win and the other 
needs to fault.
-            boolean thisThreadCanStream;
-            synchronized (lock)
-            {
-                ++streamingClientsCount;
-                if (streamingClientsCount == maxStreamingClients)
-                {
-                    thisThreadCanStream = true; // This thread got the last 
spot.
-                    canStream = false;
-                }
-                else if (streamingClientsCount > maxStreamingClients)
-                {
-                    thisThreadCanStream = false; // This thread was beaten out 
for the last spot.
-                    --streamingClientsCount; // Decrement the count because 
we're not going to grant the streaming right to the client.
-                }
-                else
-                {
-                    // We haven't hit the limit yet, allow this thread to 
stream.
-                    thisThreadCanStream = true;
-                }
-            }
-
-            // If the thread cannot wait due to endpoint streaming connection
-            // limit, inform the client and return.
-            if (!thisThreadCanStream)
-            {
-                String errorMessage = "Endpoint with id '" + getId() + "' 
cannot grant streaming connection to FlexClient with id '"
-                        + flexClient.getId() + "' because " + 
MAX_STREAMING_CLIENTS + " limit of '"
-                        + maxStreamingClients + "' has been reached.";
-                if (Log.isError())
-                    log.error(errorMessage);
-                try
-                {
-                    errorMessage = "Endpoint with id '" + getId() + "' cannot 
grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + 
MAX_STREAMING_CLIENTS + " limit has been reached.";
-                    res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, 
errorMessage);
-                }
-                catch (IOException ignore)
-                {}
-                return;
-            }
-
-            // Setup for specific user agents.
-            byte[] kickStartBytesToStream = null;
-            String userAgentValue = 
req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
-            UserAgentSettings agentSettings = 
userAgentManager.match(userAgentValue);
-            if (agentSettings != null)
-            {
-                synchronized (session)
-                {
-                    session.maxConnectionsPerSession = 
agentSettings.getMaxPersistentConnectionsPerSession();
-                }
-
-                int kickStartBytes = agentSettings.getKickstartBytes();
-                if (kickStartBytes > 0)
-                {
-                    // Determine the minimum number of actual bytes that need 
to be sent to
-                    // kickstart, taking into account transfer-encoding 
overhead.
-                    try
-                    {
-                        int chunkLengthHeaderSize = 
Integer.toHexString(kickStartBytes).getBytes("ASCII").length;
-                        int chunkOverhead = chunkLengthHeaderSize + 4; // 4 
for the 2 wrapping CRLF tokens.
-                        int minimumKickstartBytes = kickStartBytes - 
chunkOverhead;
-                        kickStartBytesToStream = new 
byte[(minimumKickstartBytes > 0) ? minimumKickstartBytes :
-                                kickStartBytes];
-                    }
-                    catch (UnsupportedEncodingException ignore)
-                    {
-                        kickStartBytesToStream = new byte[kickStartBytes];
-                    }
-                    Arrays.fill(kickStartBytesToStream, NULL_BYTE);
-                }
-            }
-
-            // Now, check with the session before granting the streaming 
connection.
-            synchronized(session)
-            {
-                ++session.streamingConnectionsCount;
-                if (session.maxConnectionsPerSession == 
FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED)
-                {
-                    thisThreadCanStream = true;
-                }
-                else if (session.streamingConnectionsCount == 
session.maxConnectionsPerSession)
-                {
-                    thisThreadCanStream = true; // This thread got the last 
spot in the session.
-                    session.canStream = false;
-                }
-                else if (session.streamingConnectionsCount > 
session.maxConnectionsPerSession)
-                {
-                    thisThreadCanStream = false; // This thread was beaten out 
for the last spot.
-                    --session.streamingConnectionsCount;
-                    synchronized(lock)
-                    {
-                        // Decrement the endpoint count because we're not 
going to grant the streaming right to the client.
-                        --streamingClientsCount;
-                    }
-                }
-                else
-                {
-                    // We haven't hit the limit yet, allow this thread to 
stream.
-                    thisThreadCanStream = true;
-                }
-            }
-
-            // If the thread cannot wait due to session streaming connection
-            // limit, inform the client and return.
-            if (!thisThreadCanStream)
-            {
-                if (Log.isError())
-                    log.error("Endpoint with id '" + getId() + "' cannot grant 
streaming connection to FlexClient with id '"
-                            + flexClient.getId() + "' because " + 
UserAgentManager.MAX_PERSISTENT_CONNECTIONS_PER_SESSION + " limit of '" + 
session.maxConnectionsPerSession
-                            + ((agentSettings != null) ? "' for user-agent '" 
+ agentSettings.getMatchOn() + "'" : "") +  " has been reached." );
-                try
-                {
-                    // Return an HTTP status code 400.
-                    String errorMessage = "The server cannot grant streaming 
connection to this client because limit has been reached.";
-                    res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, 
errorMessage);
-                }
-                catch (IOException ignore)
-                {
-                    // NOWARN
-                }
-                return;
-            }
-
-            Thread currentThread = Thread.currentThread();
-            String threadName = currentThread.getName();
-            EndpointPushNotifier notifier = null;
-            boolean suppressIOExceptionLogging = false; // Used to suppress 
logging for IO exception.
-            try
-            {
-                currentThread.setName(threadName + 
STREAMING_THREAD_NAME_EXTENSION);
-
-                // Open and commit response headers and get output stream.
-                if (addNoCacheHeaders)
-                    addNoCacheHeaders(req, res);
-                res.setContentType(getResponseContentType());
-                res.setHeader("Transfer-Encoding", "chunked");
-                res.setHeader("Connection", "close");
-                ServletOutputStream os = res.getOutputStream();
-                res.flushBuffer();
-
-                // If kickstart-bytes are specified, stream them.
-                if (kickStartBytesToStream != null)
-                {
-                    if (Log.isDebug())
-                        log.debug("Endpoint with id '" + getId() + "' is 
streaming " + kickStartBytesToStream.length
-                                + " bytes (not counting chunk encoding 
overhead) to kick-start the streaming connection for FlexClient with id '"
-                                + flexClient.getId() + "'.");
-
-                    streamChunk(kickStartBytesToStream, os, res);
-                }
-
-                // Setup serialization and type marshalling contexts
-                setThreadLocals();
-
-                // Activate streaming helper for this connection.
-                // Watch out for duplicate stream issues.
-                try
-                {
-                    notifier = new EndpointPushNotifier(this, flexClient);
-                }
-                catch (MessageException me)
-                {
-                    if (me.getNumber() == 10033) // It's a duplicate stream 
request from the same FlexClient. Leave the current stream in place and fault 
this.
-                    {
-                        if (Log.isWarn())
-                            log.warn("Endpoint with id '" + getId() + "' 
received a duplicate streaming connection request from, FlexClient with id '"
-                                    + flexClient.getId() + "'. Faulting 
request.");
-
-                        // Rollback counters and send an error response.
-                        synchronized (lock)
-                        {
-                            --streamingClientsCount;
-                            canStream = (streamingClientsCount < 
maxStreamingClients);
-                            synchronized (session)
-                            {
-                                --session.streamingConnectionsCount;
-                                session.canStream = 
(session.maxConnectionsPerSession == 
FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
-                                        || session.streamingConnectionsCount < 
session.maxConnectionsPerSession);
-                            }
-                        }
-                        try
-                        {
-                            res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-                        }
-                        catch (IOException ignore)
-                        {
-                            // NOWARN
-                        }
-                        return; // Exit early.
-                    }
-                }
-                if (connectionIdleTimeoutMinutes > 0)
-                    
notifier.setIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
-                notifier.setLogCategory(getLogCategory());
-                monitorTimeout(notifier);
-                currentStreamingRequests.put(notifier.getNotifierId(), 
notifier);
-
-                // Push down an acknowledgement for the 'connect' request 
containing the unique id for this specific stream.
-                AcknowledgeMessage connectAck = new AcknowledgeMessage();
-                connectAck.setBody(notifier.getNotifierId());
-                
connectAck.setCorrelationId(BaseStreamingHTTPEndpoint.OPEN_COMMAND);
-                ArrayList toPush = new ArrayList(1);
-                toPush.add(connectAck);
-                streamMessages(toPush, os, res);
-
-                // Output session level streaming count.
-                if (Log.isDebug())
-                    
Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming 
clients for FlexSession with id '"+ session.getId() +"' is " + 
session.streamingConnectionsCount + ".");
-
-                // Output endpoint level streaming count.
-                if (Log.isDebug())
-                    log.debug("Number of streaming clients for endpoint with 
id '"+ getId() +"' is " + streamingClientsCount + ".");
-
-                // And cycle in a wait-notify loop with the aid of the helper 
until it
-                // is closed, we're interrupted or the act of streaming data 
to the client fails.
-                while (!notifier.isClosed())
-                {
-                    try
-                    {
-                        // Drain any messages that might have been accumulated
-                        // while the previous drain was being processed.
-                        List<AsyncMessage> messages = null;
-                        synchronized (notifier.pushNeeded)
-                        {
-                            messages = notifier.drainMessages();
-                        }
-                        streamMessages(messages, os, res);
-
-                        synchronized (notifier.pushNeeded)
-                        {
-                            
notifier.pushNeeded.wait(serverToClientHeartbeatMillis);
-                        
-                            messages = notifier.drainMessages();
-                        }
-                        // If there are no messages to send to the client, 
send an null
-                        // byte as a heartbeat to make sure the client is 
still valid.
-                        if (messages == null && serverToClientHeartbeatMillis 
> 0)
-                        {
-                            try
-                            {
-                                os.write(NULL_BYTE);
-                                res.flushBuffer();
-                            }
-                            catch (IOException e)
-                            {
-                                if (Log.isWarn())
-                                    log.warn("Endpoint with id '" + getId() + 
"' is closing the streaming connection to FlexClient with id '"
-                                            + flexClient.getId() + "' because 
endpoint encountered a socket write error" +
-                                            ", possibly due to an unresponsive 
FlexClient.", e);
-                                break; // Exit the wait loop.
-                            }
-                        }
-                        // Otherwise stream the messages to the client.
-                        else
-                        {
-                            // Update the last time notifier was used to drain 
messages.
-                            // Important for idle timeout detection.
-                            notifier.updateLastUse();
-
-                            streamMessages(messages, os, res);
-                        }
-                    }
-                    catch (InterruptedException e)
-                    {
-                        if (Log.isWarn())
-                            log.warn("Streaming thread '" + threadName + "' 
for endpoint with id '" + getId() + "' has been interrupted and the streaming 
connection will be closed.");
-                        os.close();
-                        break; // Exit the wait loop.
-                    }
-
-                    // Update the FlexClient last use time to prevent 
FlexClient from
-                    // timing out when the client is still subscribed. It is 
important
-                    // to do this outside synchronized(notifier.pushNeeded) to 
avoid
-                    // thread deadlock!
-                    flexClient.updateLastUse();
-                }
-                if (Log.isDebug())
-                    log.debug("Streaming thread '" + threadName + "' for 
endpoint with id '" + getId() + "' is releasing connection and returning to the 
request handler pool.");
-                suppressIOExceptionLogging = true;
-                // Terminate the response.
-                streamChunk(null, os, res);
-            }
-            catch (IOException e)
-            {
-                if (Log.isWarn() && !suppressIOExceptionLogging)
-                    log.warn("Streaming thread '" + threadName + "' for 
endpoint with id '" + getId() + "' is closing connection due to an IO error.", 
e);
-            }
-            finally
-            {
-                currentThread.setName(threadName);
-
-                // We're done so decrement the counts for streaming threads,
-                // and update the canStream flag if necessary.
-                synchronized (lock)
-                {
-                    --streamingClientsCount;
-                    canStream = (streamingClientsCount < maxStreamingClients);
-                    synchronized (session)
-                    {
-                        --session.streamingConnectionsCount;
-                        session.canStream = (session.maxConnectionsPerSession 
== FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
-                                || session.streamingConnectionsCount < 
session.maxConnectionsPerSession);
-                    }
-                }
-
-                if (notifier != null && currentStreamingRequests != null)
-                {
-                    currentStreamingRequests.remove(notifier.getNotifierId());
-                    notifier.close();
-                }
-
-                // Output session level streaming count.
-                if (Log.isDebug())
-                    
Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming 
clients for FlexSession with id '"+ session.getId() +"' is " + 
session.streamingConnectionsCount + ".");
-
-                // Output endpoint level streaming count.
-                if (Log.isDebug())
-                    log.debug("Number of streaming clients for endpoint with 
id '"+ getId() +"' is " + streamingClientsCount + ".");
-            }
-        }
-        // Otherwise, client's streaming connection open request could not be 
granted.
-        else
-        {
-            if (Log.isError())
-            {
-                String logString = null;
-                if (!canStream)
-                {
-                    logString = "Endpoint with id '" + getId() + "' cannot 
grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + 
MAX_STREAMING_CLIENTS + " limit of '"
-                    + maxStreamingClients + "' has been reached.";
-                }
-                else if (!session.canStream)
-                {
-                    logString = "Endpoint with id '" + getId() + "' cannot 
grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + 
UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit of '"
-                    + session.maxConnectionsPerSession + "' has been reached.";
-                }
-                if (logString != null)
-                    log.error(logString);
-            }
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that client 
request can't be processed.
-                String errorMessage = null;
-                if (!canStream)
-                {
-                    errorMessage = "Endpoint with id '" + getId() + "' cannot 
grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + 
MAX_STREAMING_CLIENTS + " limit has been reached.";
-                }
-                else if (!session.canStream)
-                {
-                    errorMessage = "Endpoint with id '" + getId() + "' cannot 
grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + 
UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit has been 
reached.";
-                }
-                res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, 
errorMessage);
-            }
-            catch (IOException ignore)
-            {}
-        }
-    }
-
-    /**
-     * Handles streaming connection close command sent by the FlexClient.
-     *
-     * @param req The <code>HttpServletRequest</code> to service.
-     * @param res The <code>HttpServletResponse</code> to be used in case an 
error
-     * has to be sent back.
-     * @param flexClient FlexClient that requested the streaming connection.
-     * @param streamId The id for the stream to close.
-     */
-    protected void handleFlexClientStreamingCloseRequest(HttpServletRequest 
req, HttpServletResponse res, FlexClient flexClient, String streamId)
-    {
-        if (streamId != null)
-        {
-            EndpointPushNotifier notifier = 
(EndpointPushNotifier)flexClient.getEndpointPushHandler(getId());
-            if ((notifier != null) && 
notifier.getNotifierId().equals(streamId))
-                notifier.close();
-        }
-    }
-
-    /**
-     * Service streaming connection commands.
-     *
-     * @param req The <code>HttpServletRequest</code> to service.
-     * @param res The <code>HttpServletResponse</code> to be used in case an 
error
-     * has to be sent back.
-     */
-    protected void serviceStreamingRequest(HttpServletRequest req, 
HttpServletResponse res)
-    {
-        // If this is a request for a streaming connection, make sure it's for 
a valid FlexClient
-        // and that the FlexSession doesn't already have a streaming 
connection open.
-        // Streaming requests are POSTs (to help prevent the possibility of 
caching) that carry the
-        // following parameters:
-        // command - Indicating a custom command for the endpoint; currently 
'open' to request a new
-        //           streaming connection be opened, and 'close' to request 
the streaming connection
-        //           to close.
-        // version - Indicates the streaming connection 'version' to use; it's 
here for backward comp. support
-        //           if we need to change how commands are handled in a future 
product release.
-        // DSId - The FlexClient id value that uniquely identifies the swf 
making the request.
-        String command = req.getParameter(COMMAND_PARAM_NAME);
-
-        // Only HTTP 1.1 is supported, disallow HTTP 1.0.
-        if (req.getProtocol().equals(HTTP_1_0))
-        {
-            if (Log.isError())
-                log.error("Endpoint with id '" + getId() + "' cannot service 
the streaming request made with " +
-                " HTTP 1.0. Only HTTP 1.1 is supported.");
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that the 
client's request was syntactically invalid (bad command).
-                res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-            }
-            catch (IOException ignore)
-            {}
-            return; // Abort further server processing.
-        }
-
-        if (!(command.equals(OPEN_COMMAND) || command.equals(CLOSE_COMMAND)))
-        {
-            if (Log.isError())
-                log.error("Endpoint with id '" + getId() + "' cannot service 
the streaming request as the supplied command '"
-                        + command + "' is invalid.");
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that the 
client's request was syntactically invalid (bad command).
-                res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-            }
-            catch (IOException ignore)
-            {}
-            return; // Abort further server processing.
-        }
-
-        String flexClientId = req.getParameter(Message.FLEX_CLIENT_ID_HEADER);
-        if (flexClientId == null)
-        {
-            if (Log.isError())
-                log.error("Endpoint with id '" + getId() + "' cannot service 
the streaming request as no FlexClient id"
-                        + " has been supplied in the request.");
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that the 
client's request was syntactically invalid (missing id).
-                res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-            }
-            catch (IOException ignore)
-            {}
-            return; // Abort further server processing.
-        }
-
-        // Validate that the provided FlexClient id exists and is associated 
with the current session.
-        // We don't do this validation with CLOSE_COMMAND because 
CLOSE_COMMAND can come in on a
-        // different session. For example, when the session expires due to 
timeout, the streaming client
-        // using that session sends a CLOSE_COMMAND on a new session to let 
the server know to clean client's
-        // corresponding server constructs. In that case, server already knows 
that session has expired so
-        // we can simply omit this validation.
-        FlexClient flexClient = null;
-        List<FlexClient> flexClients = 
FlexContext.getFlexSession().getFlexClients();
-        boolean validFlexClientId = false;
-        for (Iterator<FlexClient> iter = flexClients.iterator(); 
iter.hasNext();)
-        {
-            flexClient = iter.next();
-            if (flexClient.getId().equals(flexClientId) && 
flexClient.isValid())
-            {
-                validFlexClientId = true;
-                break;
-            }
-        }
-        if (!command.equals(CLOSE_COMMAND) && !validFlexClientId)
-        {
-            if (Log.isError())
-                log.error("Endpoint with id '" + getId() + "' cannot service 
the streaming request as either the supplied"
-                        + " FlexClient id '" + flexClientId + " is not valid, 
or the FlexClient with that id is not valid.");
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that the 
client's request was syntactically invalid (invalid id).
-                res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-            }
-            catch (IOException ignore)
-            {}
-            return; // Abort further server processing.
-        }
-
-        // If a close command is received and we don't have any flex clients 
registered simply invalidate 
-        // the Flex Session. This will take care of the Flex Session that got 
created when the MB servlet 
-        // was processing the CLOSE request.
-        if (command.equals(CLOSE_COMMAND) && flexClients.size() == 0)
-        {
-            FlexSession flexSession = FlexContext.getFlexSession();
-            if (flexSession instanceof HttpFlexSession)
-            {
-                ((HttpFlexSession)flexSession).invalidate(false);
-            }
-            return;
-        }
-
-        if (flexClient != null)
-        {
-            if (command.equals(OPEN_COMMAND))
-                handleFlexClientStreamingOpenRequest(req, res, flexClient);
-            else if (command.equals(CLOSE_COMMAND))
-                handleFlexClientStreamingCloseRequest(req, res, flexClient, 
req.getParameter(STREAM_ID_PARAM_NAME));
-        }
-    }
-
-    /**
-     * Helper method to write a chunk of bytes to the output stream in an HTTP
-     * "Transfer-Encoding: chunked" format.
-     * If the bytes array is null or empty, a terminal chunk will be written to
-     * signal the end of the response.
-     * Once the chunk is written to the output stream, the stream will be 
flushed immediately (no buffering).
-     *
-     * @param bytes The array of bytes to write as a chunk in the response; or 
if null, the signal to write the final chunk to complete the response.
-     * @param os The output stream the chunk will be written to.
-     * @param response The HttpServletResponse, used to flush the chunk to the 
client.
-     *
-     * @throws IOException if writing the chunk to the output stream fails.
-     */
-    protected void streamChunk(byte[] bytes, ServletOutputStream os, 
HttpServletResponse response) throws IOException
-    {
-        if ((bytes != null) && (bytes.length > 0))
-        {
-            byte[] chunkLength = 
Integer.toHexString(bytes.length).getBytes("ASCII");
-            os.write(chunkLength);
-            os.write(CRLF_BYTES);
-            os.write(bytes);
-            os.write(CRLF_BYTES);
-            response.flushBuffer();
-        }
-        else // Send final 'EOF' chunk for the response.
-        {
-            os.write(ZERO_BYTE);
-            os.write(CRLF_BYTES);
-            response.flushBuffer();
-        }
-    }
-
-    /**
-     * Helper method invoked by the endpoint request handler thread cycling in 
wait-notify.
-     * Serializes messages and streams each to the client as a response chunk 
using streamChunk().
-     *
-     * @param messages The messages to serialize and push to the client.
-     * @param os The output stream the chunk will be written to.
-     * @param response The HttpServletResponse, used to flush the chunk to the 
client.
-     */
-    protected abstract void streamMessages(List messages, ServletOutputStream 
os, HttpServletResponse response) throws IOException;
-
-    /**
-     * Given a message, returns the MessagePerformanceInfo object if the 
message
-     * performance gathering is enabled, returns null otherwise.
-     *
-     * @param message The message.
-     * @return MessagePerformanceInfo if the message performance gathering is 
enabled,
-     * null otherwise.
-     */
-    protected MessagePerformanceInfo getMPI(Message message)
-    {
-        return (isRecordMessageSizes() || isRecordMessageTimes())?
-                MessagePerformanceUtils.getMPII(message) : null;
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Private methods.
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Utility method used at EndpointPushNotifier construction to monitor it 
for timeout.
-     *
-     * @param notifier The EndpointPushNotifier to monitor.
-     */
-    private void monitorTimeout(EndpointPushNotifier notifier)
-    {
-        if (pushNotifierTimeoutManager != null)
-            pushNotifierTimeoutManager.scheduleTimeout(notifier);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/endpoints/DuplicateSessionException.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/DuplicateSessionException.java 
b/core/src/flex/messaging/endpoints/DuplicateSessionException.java
deleted file mode 100644
index 2cc6b0f..0000000
--- a/core/src/flex/messaging/endpoints/DuplicateSessionException.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints;
-
-import flex.messaging.MessageException;
-import flex.messaging.log.LogEvent;
-
-/**
- * Exception class used to indicate duplicate client sessions were detected.
- */
-public class DuplicateSessionException extends MessageException
-{
-    /**
-     *
-     */
-    public static final String DUPLICATE_SESSION_DETECTED_CODE = 
"Server.Processing.DuplicateSessionDetected";
-
-    /**
-     *
-     */
-    private static final long serialVersionUID = -741704726700619666L;
-
-    
//--------------------------------------------------------------------------
-    //
-    // Constructors
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Default constructor.
-     * Sets the code to a default value of 
<code>DUPLICATE_SESSION_DETECTED_CODE</code>.
-     */
-    public DuplicateSessionException()
-    {
-        setCode(DUPLICATE_SESSION_DETECTED_CODE);
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Properties
-    //
-    
//--------------------------------------------------------------------------
-
-    //----------------------------------
-    //  preferredLogLevel
-    //----------------------------------
-
-    /**
-     * Override to log at the DEBUG level.
-     */
-    @Override public short getPreferredLogLevel()
-    {
-        return LogEvent.DEBUG;
-    }
-
-    //----------------------------------
-    //  logStackTraceEnabled
-    //----------------------------------
-
-    /**
-     * Override to suppress stack trace logging.
-     */
-    @Override public boolean isLogStackTraceEnabled()
-    {
-        return false;
-    }
-}

Reply via email to