Repository: flex-blazeds
Updated Branches:
  refs/heads/develop e1ad55788 -> bf2e1dc9b


http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java 
b/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
new file mode 100644
index 0000000..3c29fae
--- /dev/null
+++ b/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlushResult;
+import flex.messaging.client.PollFlushResult;
+import flex.messaging.client.PollWaitListener;
+import flex.messaging.client.UserAgentSettings;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.log.Log;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.util.UserAgentManager;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Base class for HTTP-based endpoints that support regular polling and long 
polling,
+ * which means placing request threads that are polling for messages into a 
wait
+ * state until messages are available for delivery or the configurable wait 
interval
+ * is reached.
+ */
+public abstract class BasePollingHTTPEndpoint extends BaseHTTPEndpoint 
implements PollWaitListener
+{
+    
//--------------------------------------------------------------------------
+    //
+    // Private Static Constants
+    //
+    
//--------------------------------------------------------------------------
+
+    private static final String POLLING_ENABLED = "polling-enabled";
+    private static final String POLLING_INTERVAL_MILLIS = 
"polling-interval-millis";
+    private static final String POLLING_INTERVAL_SECONDS = 
"polling-interval-seconds"; // Deprecated configuration option.
+    private static final String MAX_WAITING_POLL_REQUESTS = 
"max-waiting-poll-requests";
+    private static final String WAIT_INTERVAL_MILLIS = "wait-interval-millis";
+    private static final String CLIENT_WAIT_INTERVAL_MILLIS = 
"client-wait-interval-millis";
+    // Force clients that exceed the long-poll limit to wait at least this 
long between poll requests.
+    // This matches the default polling interval defined in the client 
PollingChannel.
+    private static final int DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS = 3000;
+
+    // User Agent based settings manager
+    private UserAgentManager userAgentManager = new UserAgentManager();
+
+    
//--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    
//--------------------------------------------------------------------------
+
+    /**
+     * Constructs an unmanaged <code>BasePollingHTTPEndpoint</code>.
+     */
+    public BasePollingHTTPEndpoint()
+    {
+        this(false);
+    }
+
+    /**
+     * Constructs an <code>BasePollingHTTPEndpoint</code> with the indicated 
management.
+     *
+     * @param enableManagement <code>true</code> if the 
<code>BasePollingHTTPEndpoint</code>
+     * is manageable; <code>false</code> otherwise.
+     */
+    public BasePollingHTTPEndpoint(boolean enableManagement)
+    {
+        super(enableManagement);
+    }
+
+    
//--------------------------------------------------------------------------
+    //
+    // Initialize, validate, start, and stop methods.
+    //
+    
//--------------------------------------------------------------------------
+
+    /**
+     * Initializes the <code>Endpoint</code> with the properties.
+     * If subclasses override this method, they must call 
<code>super.initialize()</code>.
+     *
+     * @param id The ID of the <code>Endpoint</code>.
+     * @param properties Properties for the <code>Endpoint</code>.
+     */
+    @Override
+    public void initialize(String id, ConfigMap properties)
+    {
+        super.initialize(id, properties);
+
+        if (properties == null || properties.size() == 0)
+        {
+            // Initialize default user agent manager settings.
+            UserAgentManager.setupUserAgentManager(null, userAgentManager);
+
+            return; // Nothing else to initialize.
+        }
+
+        // General poll props.
+        pollingEnabled = properties.getPropertyAsBoolean(POLLING_ENABLED, 
false);
+        pollingIntervalMillis = 
properties.getPropertyAsLong(POLLING_INTERVAL_MILLIS, -1);
+        long pollingIntervalSeconds = 
properties.getPropertyAsLong(POLLING_INTERVAL_SECONDS, -1); // Deprecated
+        if (pollingIntervalSeconds > -1)
+            pollingIntervalMillis = pollingIntervalSeconds * 1000;
+
+        // Piggybacking props.
+        piggybackingEnabled = 
properties.getPropertyAsBoolean(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT,
 false);
+
+        // HTTP poll wait props.
+        maxWaitingPollRequests = 
properties.getPropertyAsInt(MAX_WAITING_POLL_REQUESTS, 0);
+        waitInterval = properties.getPropertyAsLong(WAIT_INTERVAL_MILLIS, 0);
+        clientWaitInterval = 
properties.getPropertyAsInt(CLIENT_WAIT_INTERVAL_MILLIS, 0);
+
+        // User Agent props.
+        UserAgentManager.setupUserAgentManager(properties, userAgentManager);
+
+        // Set initial state for the canWait flag based on whether we allow 
waits or not.
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval 
> 0))
+        {
+            waitEnabled = true;
+            canWait = true;
+        }
+    }
+
+    
//--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    
//--------------------------------------------------------------------------
+
+    /**
+     * This flag is volatile to allow for consistent reads across thread 
without
+     * needing to pay the cost for a synchronized lock for each read.
+     */
+    private volatile boolean canWait;
+
+    /**
+     * Used to synchronize sets and gets to the number of waiting clients.
+     */
+    protected final Object lock = new Object();
+
+    /**
+     * Set when properties are handled; used as a shortcut for logging to 
determine whether this
+     * instance attempts to put request threads in a wait state or not.
+     */
+    private boolean waitEnabled;
+
+    /**
+     * A count of the number of request threads that are currently in the wait 
state (including
+     * those on their way into or out of it).
+     */
+    protected int waitingPollRequestsCount;
+
+    /**
+     * A Map(notification Object for a waited request thread, Boolean.TRUE).
+     */
+    private ConcurrentHashMap currentWaitedRequests;
+
+    
//--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    
//--------------------------------------------------------------------------
+
+    //----------------------------------
+    //  clientWaitInterval
+    //----------------------------------
+
+    protected int clientWaitInterval = 0;
+
+    /**
+     * Retrieves the number of milliseconds the client will wait after 
receiving a response for
+     * a poll with server wait before it issues its next poll request.
+     * A value of zero or less causes the client to use its default polling 
interval (based on the
+     * channel's polling-interval-millis configuration) and this value is 
ignored.
+     * A value greater than zero will cause the client to wait for the 
specified interval before
+     * issuing its next poll request with a value of 1 triggering an immediate 
poll from the client
+     * as soon as a waited poll response is received.
+     * @return The client wait interval.
+     */
+    public int getClientWaitInterval()
+    {
+        return clientWaitInterval;
+    }
+
+    /**
+     * Sets the number of milliseconds a client will wait after receiving a 
response for a poll
+     * with server wait before it issues its next poll request.
+     * A value of zero or less causes the client to use its default polling 
interval (based on the
+     * channel's polling-interval-millis configuration) and this value is 
ignored.
+     * A value greater than zero will cause the client to wait for the 
specified interval before
+     * issuing its next poll request with a value of 1 triggering an immediate 
poll from the client
+     * as soon as a waited poll response is received.
+     * This property does not effect polling clients that poll the server 
without a server wait.
+     *
+     * @param value The number of milliseconds a client will wait before 
issuing its next poll when the
+     *        server is configured to wait.
+     */
+    public void setClientWaitInterval(int value)
+    {
+        clientWaitInterval = value;
+    }
+
+    //----------------------------------
+    //  maxWaitingPollRequests
+    //----------------------------------
+
+    protected int maxWaitingPollRequests = 0;
+
+    /**
+     * Retrieves the maximum number of server poll response threads that will 
be
+     * waiting for messages to arrive for clients.
+     * @return The maximum number of waiting poll requests.
+     */
+    public int getMaxWaitingPollRequests()
+    {
+        return maxWaitingPollRequests;
+    }
+
+    /**
+     * Sets the maximum number of server poll response threads that will be
+     * waiting for messages to arrive for clients.
+     *
+     * @param maxWaitingPollRequests The maximum number of server poll 
response threads
+     * that will be waiting for messages to arrive for the client.
+     */
+    public void setMaxWaitingPollRequests(int maxWaitingPollRequests)
+    {
+        this.maxWaitingPollRequests = maxWaitingPollRequests;
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval 
> 0))
+        {
+            waitEnabled = true;
+            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
+        }
+    }
+
+    //----------------------------------
+    //  pollingEnabled
+    //----------------------------------
+
+    /**
+     *
+     * This is a property used on the client.
+     */
+    protected boolean piggybackingEnabled;
+
+    //----------------------------------
+    //  pollingEnabled
+    //----------------------------------
+
+    /**
+     *
+     * This is a property used on the client.
+     */
+    protected boolean pollingEnabled;
+
+    //----------------------------------
+    //  pollingIntervalMillis
+    //----------------------------------
+
+    /**
+     *
+     * This is a property used on the client.
+     */
+    protected long pollingIntervalMillis = -1;
+
+    //----------------------------------
+    //  waitInterval
+    //----------------------------------
+
+    protected long waitInterval = 0;
+
+    /**
+     * Retrieves the number of milliseconds the server poll response thread 
will be
+     * waiting for messages to arrive for the client.
+     * @return The wait interval.
+     */
+    public long getWaitInterval()
+    {
+        return waitInterval;
+    }
+
+    /**
+     * Sets the number of milliseconds the server poll response thread will be
+     * waiting for messages to arrive for the client.
+     *
+     * @param waitInterval The number of milliseconds the server poll response 
thread will be
+     * waiting for messages to arrive for the client.
+     */
+    public void setWaitInterval(long waitInterval)
+    {
+        this.waitInterval = waitInterval;
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval 
> 0))
+        {
+            waitEnabled = true;
+            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
+        }
+    }
+
+    //----------------------------------
+    //  waitingPollRequestsCount
+    //----------------------------------
+
+    /**
+     * Retrieves the count of the number of request threads that are currently 
in the wait state
+     * (including those on their way into or out of it).
+     *
+     * @return The count of the number of request threads that are currently 
in the wait state.
+     */
+    public int getWaitingPollRequestsCount()
+    {
+        return waitingPollRequestsCount;
+    }
+
+    
//--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    
//--------------------------------------------------------------------------
+
+    /**
+     *
+     * Returns a <code>ConfigMap</code> of endpoint properties that the client
+     * needs. This includes properties from <code>super.describeEndpoint</code>
+     * and additional <code>BaseHTTPEndpoint</code> specific properties under
+     * "properties" key.
+     */
+    @Override
+    public ConfigMap describeEndpoint()
+    {
+        ConfigMap endpointConfig = super.describeEndpoint();
+
+        boolean createdProperties = false;
+        ConfigMap properties = 
endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
+        if (properties == null)
+        {
+            properties = new ConfigMap();
+            createdProperties = true;
+        }
+
+        if (pollingEnabled)
+        {
+            ConfigMap pollingEnabled = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            pollingEnabled.addProperty(EMPTY_STRING, TRUE_STRING);
+            properties.addProperty(POLLING_ENABLED, pollingEnabled);
+        }
+
+        if (pollingIntervalMillis > -1)
+        {
+            ConfigMap pollingInterval = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            pollingInterval.addProperty(EMPTY_STRING, 
String.valueOf(pollingIntervalMillis));
+            properties.addProperty(POLLING_INTERVAL_MILLIS, pollingInterval);
+        }
+
+        if (piggybackingEnabled)
+        {
+            ConfigMap piggybackingEnabled = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            piggybackingEnabled.addProperty(EMPTY_STRING, 
String.valueOf(piggybackingEnabled));
+            
properties.addProperty(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, 
piggybackingEnabled);
+        }
+
+        if (createdProperties && properties.size() > 0)
+            
endpointConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, 
properties);
+
+        return endpointConfig;
+    }
+
+    /**
+     * Sets up monitoring of waited poll requests so they can be notified and 
exit when the
+     * endpoint stops.
+     *
+     * @see flex.messaging.endpoints.AbstractEndpoint#start()
+     */
+    @Override
+    public void start()
+    {
+        if (isStarted())
+            return;
+
+        super.start();
+
+        currentWaitedRequests = new ConcurrentHashMap();
+    }
+
+    /**
+     * Ensures that no poll requests in a wait state are left un-notified when 
the endpoint stops.
+     *
+     * @see flex.messaging.endpoints.AbstractEndpoint#stop()
+     */
+    @Override
+    public void stop()
+    {
+        if (!isStarted())
+            return;
+
+        // Notify any currently waiting polls.
+        for (Object notifier : currentWaitedRequests.keySet())
+        {
+            synchronized (notifier)
+            {
+                notifier.notifyAll(); // Break any current waits.
+            }
+        }
+        currentWaitedRequests = null;
+
+        super.stop();
+    }
+
+    /**
+     * (non-Javaodc)
+     * @see flex.messaging.client.PollWaitListener#waitStart(Object)
+     */
+    public void waitStart(Object notifier)
+    {
+        currentWaitedRequests.put(notifier, Boolean.TRUE);
+    }
+
+    /**
+     * (non-Javaodc)
+     * @see flex.messaging.client.PollWaitListener#waitEnd(Object)
+     */
+    public void waitEnd(Object notifier)
+    {
+        if (currentWaitedRequests != null)
+            currentWaitedRequests.remove(notifier);
+    }
+
+    
//--------------------------------------------------------------------------
+    //
+    // Protected Methods
+    //
+    
//--------------------------------------------------------------------------
+
+    /**
+     * Overrides the base poll handling to support optionally putting Http 
request handling threads
+     * into a wait state until messages are available to be delivered in the 
poll response or a timeout is reached.
+     * The number of threads that may be put in a wait state is bounded by 
<code>max-waiting-poll-requests</code>
+     * and waits will only be attempted if the canWait flag that is based on 
the <code>max-waiting-poll-requests</code>
+     * and the specified <code>wait-interval</code> is true.
+     *
+     * @param flexClient The FlexClient that issued the poll request.
+     * @param pollCommand The poll command from the client.
+     * @return The flush info used to build the poll response.
+     */
+    @Override
+    protected FlushResult handleFlexClientPoll(FlexClient flexClient, 
CommandMessage pollCommand)
+    {
+        FlushResult flushResult = null;
+        if (canWait && 
!pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
+        {
+            FlexSession session = FlexContext.getFlexSession();
+            // If canWait is true it means we currently have less than the max 
number of allowed waiting threads.
+
+            // We need to protect writes/reads to the wait count with the 
endpoint's lock.
+            // Also, we have to be careful to handle the case where two 
threads get to this point when only
+            // one wait spot remains; one thread will win and the other needs 
to revert to a non-waitable poll.
+            boolean thisThreadCanWait;
+            synchronized (lock)
+            {
+                ++waitingPollRequestsCount;
+                if (waitingPollRequestsCount == maxWaitingPollRequests)
+                {
+                    thisThreadCanWait = true; // This thread got the last wait 
spot.
+                    canWait = false;
+                }
+                else if (waitingPollRequestsCount > maxWaitingPollRequests)
+                {
+                    thisThreadCanWait = false; // This thread was beaten out 
for the last spot.
+                    --waitingPollRequestsCount; // Decrement the count because 
we're not going to try a poll with wait.
+                    canWait = false; // All the wait spots are currently 
occupied so prevent further attempts for now.
+                }
+                else
+                {
+                    // We haven't hit the limit yet, allow this thread to wait.
+                    thisThreadCanWait = true;
+                }
+            }
+
+            // Check the max waiting connections per session count
+            if (thisThreadCanWait)
+            {
+                String userAgentValue = 
FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
+                UserAgentSettings agentSettings = 
userAgentManager.match(userAgentValue);
+                synchronized(session)
+                {
+                    if (agentSettings != null)
+                        session.maxConnectionsPerSession = 
agentSettings.getMaxPersistentConnectionsPerSession();
+
+                    ++session.streamingConnectionsCount;
+                    if (session.maxConnectionsPerSession == 
FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
+                            || session.streamingConnectionsCount <= 
session.maxConnectionsPerSession)
+                    {
+                        thisThreadCanWait = true; // We haven't hit the limit 
yet, allow the wait.
+                    }
+                    else // (session.streamingConnectionsCount > 
session.maxConnectionsPerSession)
+                    {
+                        thisThreadCanWait = false; // no more from this client
+                        --session.streamingConnectionsCount;
+                    }
+                }
+
+                if (!thisThreadCanWait)
+                {
+                    // Decrement the waiting poll count, since this poll isn't 
going to wait.
+                    synchronized (lock)
+                    {
+                        --waitingPollRequestsCount;
+                        if (waitingPollRequestsCount < maxWaitingPollRequests)
+                            canWait = true;
+                    }
+                    if (Log.isDebug())
+                    {
+                        log.debug("Max long-polling requests per session limit 
(" + session.maxConnectionsPerSession + ") has been reached, this poll won't 
wait.");
+                    }
+                }
+
+            }
+
+            if (thisThreadCanWait)
+            {
+                if (Log.isDebug())
+                    log.debug("Number of waiting threads for endpoint with id 
'"+ getId() +"' is " + waitingPollRequestsCount + ".");
+
+                try
+                {
+                    flushResult  = flexClient.pollWithWait(getId(), 
FlexContext.getFlexSession(), this, waitInterval);
+                    if (flushResult != null)
+                    {
+                        // Prevent busy-polling due to multiple clients 
sharing a session and swapping each other out too quickly.
+                        if ((flushResult instanceof PollFlushResult) && 
((PollFlushResult)flushResult).isAvoidBusyPolling() && 
(flushResult.getNextFlushWaitTimeMillis() < 
DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
+                        {
+                            // Force the client polling interval to match the 
default defined in the client PollingChannel.
+                            
flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
+                        }
+                        else if ((clientWaitInterval > 0) && 
(flushResult.getNextFlushWaitTimeMillis() == 0))
+                        {
+                            // If the FlushResult doesn't specify it's own 
flush wait time, use the configured clientWaitInterval if defined.
+                            
flushResult.setNextFlushWaitTimeMillis(clientWaitInterval);
+                        }
+                    }
+                }
+                finally
+                {
+                    // We're done waiting so decrement the count of waiting 
threads and update the canWait flag if necessary
+                    synchronized (lock)
+                    {
+                        --waitingPollRequestsCount;
+                        if (waitingPollRequestsCount < maxWaitingPollRequests)
+                            canWait = true;
+                    }
+                    synchronized (session)
+                    {
+                        --session.streamingConnectionsCount;
+                    }
+
+                    if (Log.isDebug())
+                        log.debug("Number of waiting threads for endpoint with 
id '"+ getId() +"' is " + waitingPollRequestsCount + ".");
+                }
+            }
+        }
+        else if (Log.isDebug() && waitEnabled)
+        {
+            if 
(pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
+                log.debug("Suppressing poll wait for this request because it 
is part of a batch of messages to process.");
+            else
+                log.debug("Max waiting poll requests limit '" + 
maxWaitingPollRequests + "' has been reached for endpoint '" + getId() + "'. 
FlexClient with id '"+ flexClient.getId() + "' will poll with no wait.");
+        }
+
+        // If we weren't able to do a poll with wait above for any reason just 
run the base poll handling logic.
+        if (flushResult == null)
+        {
+            flushResult = super.handleFlexClientPoll(flexClient, pollCommand);
+            // If this is an excess poll request that we couldn't wait on, 
make sure the client doesn't poll the endpoint too aggressively.
+            // In this case, force a client wait to match the default polling 
interval defined in the client PollingChannel.
+            if ( waitEnabled && (pollingIntervalMillis < 
DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
+            {
+                if (flushResult == null)
+                {
+                    flushResult = new FlushResult();
+                }
+                
flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
+            }
+        }
+
+        return flushResult;
+    }
+}

Reply via email to