http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/RemoteSubscriptionManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/flex/messaging/services/messaging/RemoteSubscriptionManager.java 
b/core/src/flex/messaging/services/messaging/RemoteSubscriptionManager.java
deleted file mode 100644
index c1a3096..0000000
--- a/core/src/flex/messaging/services/messaging/RemoteSubscriptionManager.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging;
-
-import flex.messaging.MessageDestination;
-import flex.messaging.MessageClient;
-import flex.messaging.services.MessageService;
-import flex.messaging.log.Log;
-import flex.messaging.util.StringUtils;
-import flex.messaging.cluster.RemoveNodeListener;
-
-import java.util.List;
-
-/**
- * The RemoteSubscriptionManager monitors subscriptions from other
- * servers, not other clients.  One MessageClient instance is used for
- * each remote server.  It clientId is the address of the remote server.
- * Using a separate instance of this class keeps the subscriptions
- * of local clients separate from remote clients.
- *
- *
- */
-public class RemoteSubscriptionManager extends SubscriptionManager implements 
RemoveNodeListener
-{
-    private Object syncLock = new Object();
-
-    /*
-     * A monitor lock used for synchronizing the attempt to request 
subscriptions
-     * across the cluster during startup.
-     */
-    private static final Object initRemoteSubscriptionsLock = new Object();
-
-    public RemoteSubscriptionManager(MessageDestination destination)
-    {
-        this(destination, false);
-    }
-
-    public RemoteSubscriptionManager(MessageDestination destination, boolean 
enableManagement)
-    {
-        super(destination, enableManagement);
-    }
-
-    public void setSessionTimeout(long sessionConfigValue)
-    {
-    }
-
-    public long getSessionTimeout()
-    {
-        return 0; // not used for remote subscriptions
-    }
-
-    public void addSubscriber(String flexClientId, Object clientId, String 
selector, String subtopic)
-    {
-        synchronized (syncLock)
-        {
-            /*
-             * Only process subscriptions for servers whose subscription state 
we have received
-             * We may receive a subscribe/unsubscribe from a peer before we 
get their
-             * subscription state... we ignore these since they will be 
included in the
-             * state we receive later
-             */
-            if (allSubscriptions.get(clientId) != null)
-                super.addSubscriber(clientId, selector, subtopic, null);
-            else if (Log.isDebug())
-                Log.getLogger(MessageService.LOG_CATEGORY).debug("Ignoring new 
remote subscription for server: " + clientId + " whose subscription state we 
have not yet received.  selector: " + selector + " subtopic: " + subtopic);
-        }
-    }
-
-    public void removeSubscriber(String flexClientId, Object clientId, String 
selector, String subtopic, String endpoint)
-    {
-        synchronized (syncLock)
-        {
-            /* Only process subscriptions for servers whose subscription state 
we have received */
-            if (allSubscriptions.get(clientId) != null)
-                super.removeSubscriber(clientId, selector, subtopic, null);
-        }
-    }
-
-    protected void sendSubscriptionToPeer(boolean subscribe, String selector, 
String subtopic)
-    {
-        // Don't do this for remote subscriptions
-    }
-
-    protected MessageClient newMessageClient(Object clientId, String 
endpointId)
-    {
-        return new RemoteMessageClient(clientId, destination, endpointId);
-    }
-
-    /**
-     * Takes the selector and subtopic list from this address and
-     * for each one create a RemoteSubscription which gets
-     * registered in this table.  We also register the remote
-     * subscription with a "per server" index so we can easily
-     * remove them later on.
-     * @param state the subscription state object
-     * @param address the remote cluster node address
-     */
-    public void setSubscriptionState(Object state, Object address)
-    {
-        MessageClient client = newMessageClient(address, null);
-
-        if (Log.isDebug())
-            Log.getLogger(MessageService.LOG_CATEGORY).debug("Received 
subscription state for destination: " + destination.getId() + " from server: " 
+ address + StringUtils.NEWLINE + state);
-
-        /*
-         * need to be sure we do not accept any remote sub/unsub messages
-         * from a given server until we have received its subscription state
-         *
-         * Also, we need to ensure we do not process any remote 
subscribe/unsubs
-         * from a remote server until we finish this list.
-         */
-        synchronized (syncLock)
-        {
-            allSubscriptions.put(address, client);
-
-            List list = (List) state;
-
-            for (int i = 0; i < list.size(); i+=2)
-            {
-                addSubscriber(null, address, (String) list.get(i), (String) 
list.get(i+1));
-            }
-        }
-        synchronized (initRemoteSubscriptionsLock)
-        {
-            initRemoteSubscriptionsLock.notifyAll();
-        }
-    }
-
-    /**
-     * This method waits for some time for the receipt of the subscription
-     * state for the server with the given address.  If we fail to receive
-     * a message after waiting for the 5 seconds, a warning is printed.
-     * @param addr the remote cluster node address
-     */
-    public void waitForSubscriptions(Object addr)
-    {
-        /* If we have not gotten the response yet from this client... */
-        if (getSubscriber(addr) == null)
-        {
-            synchronized (initRemoteSubscriptionsLock)
-            {
-                try
-                {
-                    if (Log.isDebug())
-                        
Log.getLogger(MessageService.LOG_CATEGORY).debug("Waiting for subscriptions 
from cluster node: " + addr + " for destination: " + destination.getId());
-
-                    initRemoteSubscriptionsLock.wait(5000);
-
-                    if (Log.isDebug())
-                        Log.getLogger(MessageService.LOG_CATEGORY).debug("Done 
waiting for subscriptions from cluster node: " + addr + " for destination: " + 
destination.getId());
-                }
-                catch (InterruptedException exc) {}
-            }
-            if (getSubscriber(addr) == null && Log.isWarn())
-                Log.getLogger(MessageService.LOG_CATEGORY).warn("No response 
yet from request subscriptions request for server: " + addr + " for 
destination: " + destination.getId());
-        }
-        else if (Log.isDebug())
-            Log.getLogger(MessageService.LOG_CATEGORY).debug("Already have 
subscriptions from server: " + addr + " for destination: " + 
destination.getId());
-    }
-
-    /**
-     * Called when a cluster node gets removed.  We need to make sure that all 
subscriptions
-     * for this node are removed.
-     * @param address the remote cluster node address
-     */
-    public void removeClusterNode(Object address)
-    {
-        if (Log.isDebug())
-            Log.getLogger(MessageService.LOG_CATEGORY).debug("Cluster node: " 
+ address + " subscriptions being removed for destination:" + 
destination.getId() + " before: " + StringUtils.NEWLINE + 
getDebugSubscriptionState());
-
-        MessageClient client = getSubscriber(address);
-        if (client != null)
-        {
-            client.invalidate();
-        }
-
-        if (Log.isDebug())
-            Log.getLogger(MessageService.LOG_CATEGORY).debug("Cluster node: " 
+ address + " subscriptions being removed for destination:" + 
destination.getId() + " after: " + StringUtils.NEWLINE + 
getDebugSubscriptionState());
-    }
-
-    protected void monitorTimeout(MessageClient client)
-    {
-        // Remote subscriptions do not timeout
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/SubscriptionManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/flex/messaging/services/messaging/SubscriptionManager.java 
b/core/src/flex/messaging/services/messaging/SubscriptionManager.java
deleted file mode 100644
index 9e53cf1..0000000
--- a/core/src/flex/messaging/services/messaging/SubscriptionManager.java
+++ /dev/null
@@ -1,915 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging;
-
-import flex.management.ManageableComponent;
-import flex.messaging.FlexContext;
-import flex.messaging.MessageClient;
-import flex.messaging.MessageDestination;
-import flex.messaging.MessageException;
-import flex.messaging.client.FlexClient;
-import flex.messaging.config.ServerSettings.RoutingMode;
-import flex.messaging.log.Log;
-import flex.messaging.messages.AsyncMessage;
-import flex.messaging.messages.Message;
-import flex.messaging.security.MessagingSecurity;
-import flex.messaging.services.MessageService;
-import flex.messaging.services.ServiceAdapter;
-import flex.messaging.services.ServiceException;
-import flex.messaging.services.messaging.selector.JMSSelector;
-import flex.messaging.services.messaging.selector.JMSSelectorException;
-import flex.messaging.util.StringUtils;
-import flex.messaging.util.TimeoutManager;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-
-/**
- *
- * The SubscriptionManager monitors subscribed clients for MessageService
- * and its subclasses, such as DataService.
- */
-public class SubscriptionManager extends ManageableComponent
-{
-    public static final String TYPE = "SubscriptionManager";
-    private static final int SUBTOPICS_NOT_SUPPORTED = 10553;
-    private static final int WILDCARD_SUBTOPICS_NOT_ALLOWED = 10560;
-    private static final Object classMutex = new Object();
-    private static int instanceCount = 0;
-
-    /**
-     * clientId to MessageClient Map for any subscriber. Note that clientId is 
tracked as
-     * Object instead of String because in clustering, clientId is not 
guaranteed to be String.
-     */
-    protected final Map<Object, MessageClient> allSubscriptions = new 
ConcurrentHashMap<Object, MessageClient>();
-    // This lock protects allSubscriptions as synchronizing on a Concurrent 
class does not work.
-    private final Object allSubscriptionsLock = new Object();
-
-    /** Subscriptions with no subtopic. */
-    private final TopicSubscription globalSubscribers = new 
TopicSubscription();
-
-    /** Subscriptions with a simple subtopic. */
-    private final Map<Subtopic, TopicSubscription> subscribersPerSubtopic = 
new ConcurrentHashMap<Subtopic, TopicSubscription>();
-
-    /** Subscriptions with a wildcard subtopic. */
-    private final Map<Subtopic, TopicSubscription> 
subscribersPerSubtopicWildcard = new ConcurrentHashMap<Subtopic, 
TopicSubscription>();
-
-    protected final MessageDestination destination;
-    // We can either timeout subscriptions by session expiration 
(idleSubscriptionTimeout=0) or by an explicit
-    // timeout.  If we time them out by timeout, this refers to the 
TimeoutManager
-    // we use to monitor session timeouts.
-    private TimeoutManager subscriberSessionManager;
-    private long subscriptionTimeoutMillis;
-
-    /**
-     * Construct a subscription manager for a destination.
-     *
-     * @param destination the destination
-     */
-    public SubscriptionManager(MessageDestination destination)
-    {
-        this(destination, false);
-    }
-
-    /**
-     * Construct a subscription manager for a destination.
-     *
-     * @param destination the destination
-     * @param enableManagement turn on management?
-     */
-    public SubscriptionManager(MessageDestination destination, boolean 
enableManagement)
-    {
-        super(enableManagement);
-        synchronized (classMutex)
-        {
-            super.setId(TYPE + ++instanceCount);
-        }
-        this.destination = destination;
-
-        subscriptionTimeoutMillis = 0;
-    }
-
-    // This component's id should never be changed as it's generated internally
-    /** {@inheritDoc} */
-    @Override public void setId(String id)
-    {
-        // No-op
-    }
-
-    /**
-     * Stops the subscription manager.
-     */
-    @Override public void stop()
-    {
-        super.stop();
-
-        // Remove management.
-        if (isManaged() && getControl() != null)
-        {
-            getControl().unregister();
-            setControl(null);
-            setManaged(false);
-        }
-
-        // Destroy subscriptions
-        synchronized (this)
-        {
-            if (subscriberSessionManager != null)
-            {
-                subscriberSessionManager.shutdown();
-                subscriberSessionManager = null;
-            }
-        }
-
-        synchronized (allSubscriptionsLock)
-        {
-            if (!allSubscriptions.isEmpty())
-            {
-                for (Map.Entry<Object, MessageClient> objectMessageClientEntry 
: allSubscriptions.entrySet())
-                    removeSubscriber(objectMessageClientEntry.getValue());
-            }
-        }
-    }
-
-    /**
-     * Set the timeout value.  Creates a timeout manager thread if needed.
-     *
-     * @param value the timeout value in milliseconds
-     */
-    public void setSubscriptionTimeoutMillis(long value)
-    {
-        subscriptionTimeoutMillis = value;
-        if (subscriptionTimeoutMillis > 0)
-        {
-            subscriberSessionManager = new TimeoutManager(new ThreadFactory()
-                                                            {
-                                                                int counter = 
1;
-                                                                public 
synchronized Thread newThread(Runnable runnable)
-                                                                {
-                                                                    Thread t = 
new Thread(runnable);
-                                                                    
t.setName(destination.getId() + "-SubscriptionTimeoutThread-" + counter++);
-                                                                    return t;
-                                                                }
-                                                            });
-        }
-    }
-
-    /**
-     * Returns the subscription timeout.
-     *
-     * @return the timeout in milliseconds
-     */
-    public long getSubscriptionTimeoutMillis()
-    {
-        return subscriptionTimeoutMillis;
-    }
-
-    /**
-     * Implement a serializer instance which wraps the subscription
-     * manager in a transient variable.  It will need to block out
-     * all sub/unsub messages before they are broadcast to the
-     * remote server, iterate through the maps of subscriptions and
-     * for each "unique" subscription it writes the selector and
-     * subtopic.
-     *
-     * synchronization note: this assumes no add/remove subscriptions
-     * are occurring while this method is called.
-     *
-     * @return a List of subscriptions selectors and subtopics
-     */
-    public Object getSubscriptionState()
-    {
-        ArrayList<String> subState = new ArrayList<String>();
-
-        if (globalSubscribers.defaultSubscriptions != null &&
-            !globalSubscribers.defaultSubscriptions.isEmpty())
-        {
-            subState.add(null); // selector string
-            subState.add(null); // subtopic string
-        }
-        if (globalSubscribers.selectorSubscriptions != null)
-        {
-            for (String s : globalSubscribers.selectorSubscriptions.keySet())
-            {
-                subState.add(s);
-                subState.add(null); // subtopic
-            }
-        }
-        addSubscriptionState(subState, subscribersPerSubtopic);
-        addSubscriptionState(subState, subscribersPerSubtopicWildcard);
-
-        if (Log.isDebug())
-            Log.getLogger(MessageService.LOG_CATEGORY).debug("Retrieved 
subscription state to send to new cluster member for destination: " + 
destination.getId() + ": " + StringUtils.NEWLINE + subState);
-
-        return subState;
-    }
-
-    private void addSubscriptionState(List<String> subState, Map<Subtopic, 
TopicSubscription> subsPerSubtopic)
-    {
-        for (Map.Entry<Subtopic, TopicSubscription> entry : 
subsPerSubtopic.entrySet())
-        {
-            Subtopic subtopic = entry.getKey();
-            TopicSubscription tc = entry.getValue();
-
-            if (tc.defaultSubscriptions != null && 
!tc.defaultSubscriptions.isEmpty())
-            {
-                subState.add(null);
-                subState.add(subtopic.toString());
-            }
-            if (tc.selectorSubscriptions != null)
-            {
-                for (String s : tc.selectorSubscriptions.keySet())
-                {
-                    subState.add(s);
-                    subState.add(subtopic.toString()); // subtopic
-                }
-            }
-        }
-
-    }
-
-    /**
-     * Get a string representation of the subscription state.
-     * @return the string
-     */
-    protected String getDebugSubscriptionState()
-    {
-        StringBuffer sb = new StringBuffer(100);
-
-        sb.append(" global subscriptions: 
").append(globalSubscribers).append(StringUtils.NEWLINE);
-        sb.append(" regular subtopic subscriptions: 
").append(subscribersPerSubtopic).append(StringUtils.NEWLINE);
-        sb.append(" wildcard subtopic subscriptions: 
").append(subscribersPerSubtopicWildcard).append(StringUtils.NEWLINE);
-        return sb.toString();
-    }
-
-    @Override protected String getLogCategory()
-    {
-        return MessageService.LOG_CATEGORY;
-    }
-
-    /**
-     * Return the ids of our subscribers.
-     * @return a set of subscriber ids
-     */
-    public Set<Object> getSubscriberIds()
-    {
-        return allSubscriptions.keySet();
-    }
-
-    /**
-     * Return the set of subscribers for a message.
-     *
-     * @param message the message
-     * @param evalSelector should the selector be evaluated?
-     * @return the set of subscribers
-     */
-    public Set<Object> getSubscriberIds(Message message, boolean evalSelector)
-    {
-        Set<Object> ids = new LinkedHashSet<Object>();
-
-        Object subtopicObj = 
message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
-
-        if (subtopicObj instanceof Object[])
-            subtopicObj = Arrays.asList((Object[])subtopicObj);
-
-        if (subtopicObj instanceof String)
-        {
-            String subtopicString = (String) subtopicObj;
-
-            if (subtopicString.length() > 0)
-                addSubtopicSubscribers(subtopicString, message, ids, 
evalSelector);
-            else
-                addTopicSubscribers(globalSubscribers, message, ids, 
evalSelector);
-        }
-        else if (subtopicObj instanceof List)
-        {
-            @SuppressWarnings("unchecked")
-            List<String> subtopicList = (List<String>)subtopicObj;
-            for (String aSubtopicList : subtopicList)
-            {
-                addSubtopicSubscribers(aSubtopicList, message, ids, 
evalSelector);
-            }
-        }
-        else
-            addTopicSubscribers(globalSubscribers, message, ids, evalSelector);
-
-        return ids;
-    }
-
-    /**
-     * Return the set of subscribers for a message.
-     *
-     * @param message the message
-     * @param evalSelector hould the selector be evaluated?
-     * @param subtopics the subtopics to use
-     * @return the set of subscribers
-     */
-    public Set<Object> getSubscriberIds(Message message, boolean evalSelector, 
List<Subtopic> subtopics)
-    {
-        Set<Object> ids = new LinkedHashSet<Object>();
-
-        if (subtopics == null || subtopics.isEmpty())
-        {
-            addTopicSubscribers(globalSubscribers, message, ids, evalSelector);
-        }
-        else
-        {
-            for (Subtopic subtopic : subtopics)
-            {
-                addSubtopicSubscribers(subtopic, message, ids, evalSelector);
-            }
-        }
-        return ids;
-    }
-
-    /**
-     * Return the set of subscribers for a subtopic pattern.
-     * Constructs a message and calls {@link 
#getSubscriberIds(flex.messaging.messages.Message, boolean)}.
-     *
-     * @param subtopicPattern  the pattern to match
-     * @param messageHeaders the message headers
-     * @return the set of subscribers
-     */
-    public Set<Object> getSubscriberIds(String subtopicPattern, Map 
messageHeaders)
-    {
-        // This could be more efficient but we'd have to change the SQLParser 
to accept a map.
-        Message msg = new AsyncMessage();
-        msg.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subtopicPattern);
-        if (messageHeaders != null)
-            msg.setHeaders(messageHeaders);
-        return getSubscriberIds(msg, true);
-    }
-
-    void addSubtopicSubscribers(String subtopicString, Message message, 
Set<Object> ids, boolean evalSelector)
-    {
-        Subtopic subtopic = getSubtopic(subtopicString);
-        addSubtopicSubscribers(subtopic, message, ids, evalSelector);
-    }
-
-    void addSubtopicSubscribers(Subtopic subtopic, Message message, 
Set<Object> ids, boolean evalSelector)
-    {
-        // If we have a subtopic, we need to route the message only to that
-        // subset of subscribers.
-        if (!destination.getServerSettings().getAllowSubtopics())
-        {
-            // Throw an error - the destination doesn't allow subtopics.
-            ServiceException se = new ServiceException();
-            se.setMessage(SUBTOPICS_NOT_SUPPORTED, new Object[] 
{subtopic.getValue(), destination.getId()});
-            throw se;
-        }
-
-        // Give a MessagingAdapter a chance to block the send to this subtopic.
-        ServiceAdapter adapter = destination.getAdapter();
-        if (adapter instanceof MessagingSecurity)
-        {
-            if (!((MessagingSecurity)adapter).allowSend(subtopic))
-            {
-                ServiceException se = new ServiceException();
-                se.setMessage(10558, new Object[] {subtopic.getValue()});
-                throw se;
-            }
-        }
-
-        TopicSubscription ts;
-        if (subscribersPerSubtopic.containsKey(subtopic))
-        {
-            ts = subscribersPerSubtopic.get(subtopic);
-            addTopicSubscribers(ts, message, ids, evalSelector);
-        }
-
-        /*
-         * TODO: performance - organize these into a tree so we can find 
consumers via
-         * a hashtable lookup rather than a linear search
-         */
-        Set<Subtopic> subtopics = subscribersPerSubtopicWildcard.keySet();
-        if (!subtopics.isEmpty())
-        {
-            for (Subtopic st : subtopics)
-            {
-                if (st.matches(subtopic))
-                {
-                    ts = subscribersPerSubtopicWildcard.get(st);
-                    addTopicSubscribers(ts, message, ids, evalSelector);
-                }
-            }
-        }
-    }
-
-    void addTopicSubscribers(TopicSubscription ts, Message message, 
Set<Object> ids, boolean evalSelector)
-    {
-        if (ts == null)
-            return;
-
-        Map<Object, MessageClient> subs = ts.defaultSubscriptions;
-        if (subs != null)
-            ids.addAll(subs.keySet());
-
-        if (ts.selectorSubscriptions == null)
-            return;
-
-        for (Map.Entry<String, Map<Object, MessageClient>> entry : 
ts.selectorSubscriptions.entrySet())
-        {
-            String selector = entry.getKey();
-            subs = entry.getValue();
-
-            if (!evalSelector)
-            {
-                ids.addAll(subs.keySet());
-            }
-            else
-            {
-                JMSSelector jmsSel = new JMSSelector(selector);
-                try
-                {
-                    if (jmsSel.match(message))
-                    {
-                        ids.addAll(subs.keySet());
-                    }
-                }
-                catch (JMSSelectorException jmse)
-                {
-                    if (Log.isWarn())
-                    {
-                        Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error 
processing message selector: " +
-                                jmsSel + StringUtils.NEWLINE +
-                                "  incomingMessage: " + message + 
StringUtils.NEWLINE +
-                                "  selector: " + selector);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Returns the requested subscriber.
-     * If the subscriber exists it is also registered for subscription timeout 
if necessary.
-     * If the subscriber is not found this method returns null.
-     *
-     * @param clientId The clientId of the target subscriber.
-     * @return The subscriber, or null if the subscriber is not found.
-     */
-    public MessageClient getSubscriber(Object clientId)
-    {
-        MessageClient client = allSubscriptions.get(clientId);
-        if (client != null && !client.isTimingOut())
-            monitorTimeout(client);
-        return client;
-    }
-
-    /**
-     * Removes the subscriber, unsubscribing it from all current subscriptions.
-     * This is used by the admin UI.
-     *
-     * @param client the client
-     */
-    public void removeSubscriber(MessageClient client)
-    {
-        // Sends unsub messages for each subscription for this MessageClient 
which
-        // should mean we remove the client at the end.
-        client.invalidate();
-
-        if (getSubscriber(client.getClientId()) != null)
-            Log.getLogger(MessageService.LOG_CATEGORY).error("Failed to remove 
client: " + client.getClientId());
-    }
-
-    /**
-     * Add a subscriber.
-     *
-     * @param clientId the client id
-     * @param selector the selector
-     * @param subtopicString the subtopic
-     * @param endpointId the endpoint
-     */
-    public void addSubscriber(Object clientId, String selector, String 
subtopicString, String endpointId)
-    {
-        addSubscriber(clientId, selector, subtopicString, endpointId, 0);
-    }
-
-    /**
-     * Add a subscriber.
-     *
-     * @param clientId the client id
-     * @param selector the selector
-     * @param subtopicString the subtopic
-     * @param endpointId the endpoint
-     * @param maxFrequency maximum frequency
-     */
-    public void addSubscriber(Object clientId, String selector, String 
subtopicString, String endpointId, int maxFrequency)
-    {
-        Subtopic subtopic = getSubtopic(subtopicString);
-        MessageClient client = null;
-        TopicSubscription topicSub;
-        Map<Object, MessageClient> subs;
-        Map<Subtopic, TopicSubscription> map;
-
-        try
-        {
-            // Handle resubscribes from the same client and duplicate 
subscribes from different clients
-            boolean subscriptionAlreadyExists = (getSubscriber(clientId) != 
null);
-            client = getMessageClient(clientId, endpointId);
-
-            FlexClient flexClient = FlexContext.getFlexClient();
-            if (subscriptionAlreadyExists)
-            {
-                // Block duplicate subscriptions from multiple FlexClients if 
they
-                // attempt to use the same clientId.  (when this is called 
from a remote
-                // subscription, there won't be a flex client so skip this 
test).
-                if (flexClient != null && 
!flexClient.getId().equals(client.getFlexClient().getId()))
-                {
-                    ServiceException se = new ServiceException();
-                    se.setMessage(10559, new Object[] {clientId});
-                    throw se;
-                }
-
-                // It's a resubscribe. Reset the endpoint push state for the 
subscription to make sure its current
-                // because a resubscribe could be arriving over a new endpoint 
or a new session.
-                client.resetEndpoint(endpointId);
-            }
-
-            ServiceAdapter adapter = destination.getAdapter();
-            client.updateLastUse();
-
-            if (subtopic == null)
-            {
-                topicSub = globalSubscribers;
-            }
-            else
-            {
-                if (!destination.getServerSettings().getAllowSubtopics())
-                {
-                    // Throw an error - the destination doesn't allow 
subtopics.
-                    ServiceException se = new ServiceException();
-                    se.setMessage(SUBTOPICS_NOT_SUPPORTED, new Object[] 
{subtopicString, destination.getId()});
-                    throw se;
-                }
-
-                if (subtopic.containsSubtopicWildcard() && 
destination.getServerSettings().isDisallowWildcardSubtopics())
-                {
-                    // Attempt to subscribe to the subtopic, ''{0}'', on 
destination, ''{1}'', that does not allow wilcard subtopics failed.
-                    ServiceException se = new ServiceException();
-                    se.setMessage(WILDCARD_SUBTOPICS_NOT_ALLOWED, new Object[] 
{subtopicString, destination.getId()});
-                    throw se;
-                }
-
-                // Give a MessagingAdapter a chance to block the subscribe.
-                if ((adapter instanceof MessagingSecurity) && (subtopic != 
null))
-                {
-                    if (!((MessagingSecurity)adapter).allowSubscribe(subtopic))
-                    {
-                        ServiceException se = new ServiceException();
-                        se.setMessage(10557, new Object[] {subtopicString});
-                        throw se;
-                    }
-                }
-
-                /*
-                 * If there is a wildcard, we always need to match that 
subscription
-                 * against the producer.  If it has no wildcard, we can do a 
quick
-                 * lookup to find the subscribers.
-                 */
-                if (subtopic.containsSubtopicWildcard())
-                    map = subscribersPerSubtopicWildcard;
-                else
-                    map = subscribersPerSubtopic;
-
-                synchronized (this)
-                {
-                    topicSub = map.get(subtopic);
-                    if (topicSub == null)
-                    {
-                        topicSub = new TopicSubscription();
-                        map.put(subtopic, topicSub);
-                    }
-                }
-            }
-
-            /* Subscribing with no selector */
-            if (selector == null)
-            {
-                subs = topicSub.defaultSubscriptions;
-                if (subs == null)
-                {
-                    synchronized (this)
-                    {
-                        if ((subs = topicSub.defaultSubscriptions) == null)
-                            topicSub.defaultSubscriptions = subs = new 
ConcurrentHashMap<Object, MessageClient>();
-                    }
-                }
-            }
-            /* Subscribing with a selector - store all subscriptions under the 
selector key */
-            else
-            {
-                synchronized (this)
-                {
-                    if (topicSub.selectorSubscriptions == null)
-                        topicSub.selectorSubscriptions = new 
ConcurrentHashMap<String,  Map<Object, MessageClient>>();
-                }
-
-                subs = topicSub.selectorSubscriptions.get(selector);
-                if (subs == null)
-                {
-                    synchronized (this)
-                    {
-                        if ((subs = 
topicSub.selectorSubscriptions.get(selector)) == null)
-                            topicSub.selectorSubscriptions.put(selector, subs 
= new ConcurrentHashMap<Object, MessageClient>());
-                    }
-                }
-            }
-
-            if (subs.containsKey(clientId))
-            {
-                /* I'd rather this be an error but in 2.0 we allowed this 
without error */
-                if (Log.isWarn())
-                    Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Client: " + 
clientId + " already subscribed to: " + destination.getId() + " selector: " + 
selector + " subtopic: " + subtopicString);
-            }
-            else
-            {
-                client.addSubscription(selector, subtopicString, maxFrequency);
-                synchronized (this)
-                {
-                    /*
-                     * Make sure other members of the cluster know that we are 
subscribed to
-                     * this info if we are in server-to-server mode
-                     *
-                     * This has to be done in the synchronized section so that 
we properly
-                     * order subscribe and unsubscribe messages for our peers 
so their
-                     * subscription state matches the one in the local server.
-                     */
-                    if (subs.isEmpty() && destination.isClustered() &&
-                        destination.getServerSettings().getRoutingMode() == 
RoutingMode.SERVER_TO_SERVER)
-                        sendSubscriptionToPeer(true, selector, subtopicString);
-                    subs.put(clientId, client);
-                }
-                monitorTimeout(client); // local operation, timeouts on remote 
host are not started until failover
-
-                // Finally, if a new MessageClient was created, notify its 
created
-                // listeners now that MessageClient's subscription state is 
setup.
-                if (!subscriptionAlreadyExists)
-                    client.notifyCreatedListeners();
-            }
-        }
-        finally
-        {
-            releaseMessageClient(client);
-        }
-
-    }
-
-    /**
-     * Remove a subscriber.
-     *
-     * @param clientId the client id
-     * @param selector the selector
-     * @param subtopicString the subtopic
-     * @param endpointId the endpoint
-     */
-    public void removeSubscriber(Object clientId, String selector, String 
subtopicString, String endpointId)
-    {
-        MessageClient client = null;
-        try
-        {
-            synchronized (allSubscriptionsLock)
-            {
-                // Do a simple lookup first to avoid the creation of a new 
MessageClient instance
-                // in the following call to getMessageClient() if the 
subscription is already removed.
-                client = allSubscriptions.get(clientId);
-                if (client == null) // Subscription was already removed.
-                    return;
-
-                // Re-get in order to track refs correctly.
-                client = getMessageClient(clientId, endpointId);
-            }
-
-            Subtopic subtopic = getSubtopic(subtopicString);
-            TopicSubscription topicSub;
-            Map<Object, MessageClient> subs;
-            Map<Subtopic, TopicSubscription> map = null;
-
-            if (subtopic == null)
-            {
-                topicSub = globalSubscribers;
-            }
-            else
-            {
-                if (subtopic.containsSubtopicWildcard())
-                    map = subscribersPerSubtopicWildcard;
-                else
-                    map = subscribersPerSubtopic;
-
-                topicSub = map.get(subtopic);
-
-                if (topicSub == null)
-                    throw new MessageException("Client: " + clientId + " not 
subscribed to subtopic: " + subtopic);
-            }
-
-            if (selector == null)
-                subs = topicSub.defaultSubscriptions;
-            else
-                subs = topicSub.selectorSubscriptions.get(selector);
-
-            if (subs == null || subs.get(clientId) == null)
-                throw new MessageException("Client: " + clientId + " not 
subscribed to destination with selector: " + selector);
-
-            synchronized (this)
-            {
-                subs.remove(clientId);
-                if (subs.isEmpty() &&
-                    destination.isClustered() && 
destination.getServerSettings().getRoutingMode() == 
RoutingMode.SERVER_TO_SERVER)
-                    sendSubscriptionToPeer(false, selector, subtopicString);
-
-                if (subs.isEmpty())
-                {
-                    if (selector != null)
-                    {
-                        if (topicSub.selectorSubscriptions != null && 
!topicSub.selectorSubscriptions.isEmpty())
-                            topicSub.selectorSubscriptions.remove(selector);
-                    }
-
-                    if (subtopic != null &&
-                        (topicSub.selectorSubscriptions == null || 
topicSub.selectorSubscriptions.isEmpty()) &&
-                        (topicSub.defaultSubscriptions == null || 
topicSub.defaultSubscriptions.isEmpty()))
-                    {
-                           if ((topicSub.selectorSubscriptions == null || 
topicSub.selectorSubscriptions.isEmpty()) &&
-                               (topicSub.defaultSubscriptions == null || 
topicSub.defaultSubscriptions.isEmpty()))
-                               map.remove(subtopic);
-                    }
-                }
-            }
-
-            if (client.removeSubscription(selector, subtopicString))
-            {
-                allSubscriptions.remove(clientId);
-                client.invalidate(); // Destroy the MessageClient.
-            }
-        }
-        finally
-        {
-            if (client != null)
-                releaseMessageClient(client);
-        }
-    }
-
-    /**
-     * Create a new MessageClient object.
-     *
-     * @param clientId the client id
-     * @param endpointId the endpoint
-     * @return constructed MessageClient
-     */
-    protected MessageClient newMessageClient(Object clientId, String 
endpointId)
-    {
-        return new MessageClient(clientId, destination, endpointId, true);
-    }
-
-    /**
-     * This method is used for subscribers who maintain client ids in their
-     * own subscription tables.  It ensures we have the MessageClient for
-     * a given clientId for as long as this session is valid (or the
-     * subscription times out).
-     *
-     * @param clientId the client id
-     * @param endpointId the endpoint
-     * @return registered MessageClient
-     */
-    public MessageClient registerMessageClient(Object clientId, String 
endpointId)
-    {
-        MessageClient client = getMessageClient(clientId, endpointId);
-
-        monitorTimeout(client);
-
-        /*
-         * There is only one reference to the MessageClient for the
-         * registered flag.  If someone happens to register the
-         * same client more than once, just allow that to add one reference.
-         */
-        if (client.isRegistered())
-            releaseMessageClient(client);
-        else
-            client.setRegistered(true);
-
-        return client;
-    }
-
-    /**
-     * Return a message client, creating it if needed.
-     *
-     * @param clientId the client if
-     * @param endpointId the endpoint
-     * @return the MessageClient
-     */
-    public MessageClient getMessageClient(Object clientId, String endpointId)
-    {
-        synchronized (allSubscriptionsLock)
-        {
-            MessageClient client = allSubscriptions.get(clientId);
-            if (client == null)
-            {
-                client = newMessageClient(clientId, endpointId);
-                allSubscriptions.put(clientId, client);
-            }
-
-            client.incrementReferences();
-            return client;
-        }
-    }
-
-    /**
-     * Release a client.
-     *
-     * @param client the client to release
-     */
-    public void releaseMessageClient(MessageClient client)
-    {
-        if (client == null)
-            return;
-
-        synchronized (allSubscriptionsLock)
-        {
-            if (client.decrementReferences())
-            {
-                allSubscriptions.remove(client.getClientId());
-                client.invalidate(); // Destroy the MessageClient.
-            }
-        }
-    }
-
-    /**
-     * Turn on timeout for the provided client.
-     *
-     * @param client the client
-     */
-    protected void monitorTimeout(MessageClient client)
-    {
-        if (subscriberSessionManager != null)
-        {
-            synchronized (client)
-            {
-                if (!client.isTimingOut())
-                {
-                    subscriberSessionManager.scheduleTimeout(client);
-                    client.setTimingOut(true);
-                }
-            }
-        }
-    }
-
-    private Subtopic getSubtopic(String subtopic)
-    {
-        return subtopic != null?
-                new Subtopic(subtopic, 
destination.getServerSettings().getSubtopicSeparator()) : null;
-    }
-
-    /**
-     * Broadcast this subscribe/unsubscribe message to the cluster so everyone 
is aware
-     * of this server's interest in messages matching this selector and 
subtopic.
-     *
-     * @param subscribe are we subscribing?
-     * @param selector the selector
-     * @param subtopic the subtopic
-     */
-    protected void sendSubscriptionToPeer(boolean subscribe, String selector, 
String subtopic)
-    {
-        if (Log.isDebug())
-            Log.getLogger(MessageService.LOG_CATEGORY).debug("Sending 
subscription to peers for subscribe? " + subscribe + " selector: " + selector + 
" subtopic: " + subtopic);
-
-        
((MessageService)destination.getService()).sendSubscribeFromPeer(destination.getId(),
 subscribe, selector, subtopic);
-    }
-
-    static class TopicSubscription
-    {
-        /** This is the Map of clientId to MessageClient for each client 
subscribed to this topic with no selector. */
-        Map<Object, MessageClient> defaultSubscriptions;
-
-        /** A map of selector string to Map of clientId to MessageClient. */
-        Map<String,  Map<Object, MessageClient>> selectorSubscriptions;
-
-        @Override public String toString()
-        {
-            StringBuffer sb = new StringBuffer(100);
-
-            sb.append("default subscriptions: 
").append(defaultSubscriptions).append(StringUtils.NEWLINE);
-            sb.append("selector subscriptions: 
").append(selectorSubscriptions).append(StringUtils.NEWLINE);
-            return sb.toString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/Subtopic.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/Subtopic.java 
b/core/src/flex/messaging/services/messaging/Subtopic.java
deleted file mode 100644
index ba641b0..0000000
--- a/core/src/flex/messaging/services/messaging/Subtopic.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging;
-
-import flex.messaging.services.ServiceException;
-
-import java.util.StringTokenizer;
-
-/**
- * Represents a message destination subtopic.  You are given instances of 
Subtopics
- * as arguments to the MessagingAdapter.allowSubscribe and allowSend methods.  
These
- * are used to implement your own authorization for the subscribe and send 
messages
- * to specific subtopics.
- */
-public class Subtopic
-{
-    
//--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Constructor.
-     *
-     *
-     *
-     * @param subtopic The full subtopic string.
-     * @param separator The separator for tokenizing a hierarchical subtopic.
-     */
-    public Subtopic(String subtopic, String separator)
-    {
-        this.subtopic = subtopic;
-        this.separator = separator;
-
-        // Subtopic cannot be zero length.
-        if (subtopic.length() == 0)
-        {
-            ServiceException se = new ServiceException();
-            se.setMessage(10554, new Object[] {subtopic});
-            throw se;
-        }
-        // Validate subtopic format if it contains a separator.
-        else if ((separator != null) && (subtopic.indexOf(separator) != -1))
-        {
-            hierarchical = true;
-            /*
-             * Each token must have non-zero length, meaning no leading or 
trailing
-             * separator or empty subtopics in between.
-             */
-            if (subtopic.startsWith(separator) ||
-                subtopic.endsWith(separator) ||
-                (subtopic.indexOf(separator + separator) != -1))
-            {
-                ServiceException se = new ServiceException();
-                se.setMessage(10554, new Object[] {subtopic});
-                throw se;
-            }
-
-            /*
-             * If a token contains the SUBTOPIC_WILDCARD, that token may not
-             * contain any additional characters.
-             * I.e. chat.* is OK, chat.f* is not OK (assuming a separator of 
'.').
-             */
-
-            StringTokenizer tokenizer = new StringTokenizer(subtopic, 
separator);
-            while (tokenizer.hasMoreTokens())
-            {
-                String token = tokenizer.nextToken();
-                if (token.indexOf(SUBTOPIC_WILDCARD) != -1)
-                {
-                    if (!token.equals(SUBTOPIC_WILDCARD))
-                    {
-                        ServiceException se = new ServiceException();
-                        se.setMessage(10554, new Object[] {subtopic});
-                        throw se;
-                    }
-                    else
-                    {
-                        hasSubtopicWildcard = true;
-                    }
-                }
-            }
-        }
-        // Non-hierarchical subtopics cannot contain subtopic wildcard unless
-        // that is the only value, "*", indicating a match for any subtopic.
-        else if (subtopic.indexOf(SUBTOPIC_WILDCARD) != -1)
-        {
-            if (!subtopic.equals(SUBTOPIC_WILDCARD))
-            {
-                ServiceException se = new ServiceException();
-                se.setMessage(10554, new Object[] {subtopic});
-                throw se;
-            }
-            else
-            {
-                hasSubtopicWildcard = true;
-            }
-        }
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Constants
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * The wildcard token for hierarchical subtopics.
-     */
-    public static final String SUBTOPIC_WILDCARD = "*";
-
-    
//--------------------------------------------------------------------------
-    //
-    // Variables
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * The full subtopic value.
-     */
-    private String subtopic;
-
-    /**
-     * The separator used if the subtopic is hierarchical.
-     */
-    private String separator;
-
-    /**
-     * Flag to store whether the subtopic is hierarchical.
-     */
-    private boolean hierarchical;
-
-    /**
-     * Flag to store whether the subtopic contains subtopic wildcards.
-     */
-    private boolean hasSubtopicWildcard;
-
-    
//--------------------------------------------------------------------------
-    //
-    // Methods
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Returns true if the subtopic contains a hierarchical subtopic wildcard.
-     *
-     * @return true if the subtopic contains a hierarchical subtopic wildcard,
-     *         otherwise false.
-     */
-    public boolean containsSubtopicWildcard()
-    {
-        return hasSubtopicWildcard;
-    }
-
-    /**
-     * Override of equals.
-     *
-     * @param other The object to compare against.
-     * @return <code>true</code> if other equals to this; <code>false</code> 
otherwise;
-     */
-    public boolean equals(Object other)
-    {
-        if (!(other instanceof Subtopic))
-            return false;
-        Subtopic otherSubtopic = (Subtopic) other;
-        return subtopic.equals(otherSubtopic.subtopic) &&
-                ((separator == null && otherSubtopic.separator == null) || 
(separator != null && separator.equals(otherSubtopic.separator)));
-    }
-
-    /**
-     * Returns the separator used to create this Subtopic instance.
-     * This value may be <code>null</code>.
-     *
-     * @return The separator used to create this Subtopic instance.
-     */
-    public String getSeparator()
-    {
-        return separator;
-    }
-
-    /**
-     * Returns the subtopic value used to create this Subtopic instance.
-     *
-     * @return The subtopic value used to create this Subtopic instance.
-     */
-    public String getValue()
-    {
-        return subtopic;
-    }
-
-    /**
-     * Override of hashCode. Hash using the subtopic String rather than the 
object's address.
-     *
-     * @return The hashCode.
-     */
-    public int hashCode()
-    {
-        return subtopic.hashCode();
-    }
-
-    /**
-     * Returns true is the subtopic is hierarchical.
-     *
-     * @return true if the subtopic is hierarchical, otherwise false.
-     */
-    public boolean isHierarchical()
-    {
-        return hierarchical;
-    }
-
-    /**
-     * Matches the passed subtopic against this subtopic.
-     * If neither subtopic contains a wildcard they must literally match.
-     * If one or the other contains a wildcard they may match.
-     * "chatrooms.*" will match "chatrooms.lobby" or "chatrooms.us.ca" but will
-     * not match "chatrooms" (assuming a subtopic separator of ".").
-     * "chatrooms.*.ca" will match "chatrooms.us.ca" but not "chatrooms.us.ma".
-     *
-     * @param other The other subtopic to match against this subtopic.
-     * @return true if they match, otherwise false.
-     */
-    public boolean matches(Subtopic other)
-    {
-        // If neither contain a wildcard, match them as simple Strings.
-        if (!hasSubtopicWildcard && !other.hasSubtopicWildcard)
-        {
-            return (subtopic.equals(other.subtopic)) ? true : false;
-        }
-        // Otherwise, this subtopic or the other contains a wildcard.
-        else
-        {
-            // If both are hierarchical but use different separators - don't 
match.
-            if (hierarchical && other.hierarchical && 
!separator.equals(other.separator))
-                return false;
-
-            StringTokenizer t1 = new StringTokenizer(subtopic, separator);
-            StringTokenizer t2 = new StringTokenizer(other.subtopic, 
other.separator);
-            int n = t1.countTokens();
-            int difference = n - t2.countTokens();
-
-            String tok1 = null;
-            String tok2 = null;
-            boolean matchToken;
-            while (n-- > 0)
-            {
-                tok1 = t1.nextToken();
-                matchToken = !tok1.equals(SUBTOPIC_WILDCARD);
-
-                if (t2.hasMoreTokens())
-                {
-                    tok2 = t2.nextToken();
-                    if (tok2.equals(SUBTOPIC_WILDCARD))
-                        continue;
-                }
-                else
-                {
-                    break; // No more tokens to compare.
-                }
-
-                if (matchToken && !tok1.equals(tok2))
-                    return false;
-            }
-
-            if (difference == 0)
-                return true;
-            else if ((difference < 0) && tok1.equals(SUBTOPIC_WILDCARD))
-                return true;
-            else return (difference > 0) && tok2.equals(SUBTOPIC_WILDCARD);
-        }
-    }
-
-    /**
-     * Override of toString.
-     *
-     * @return The subtopic string.
-     */
-    public String toString()
-    {
-        return subtopic;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/ThrottleManager.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/ThrottleManager.java 
b/core/src/flex/messaging/services/messaging/ThrottleManager.java
deleted file mode 100644
index 499c5f0..0000000
--- a/core/src/flex/messaging/services/messaging/ThrottleManager.java
+++ /dev/null
@@ -1,556 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import flex.management.ManageableComponent;
-import 
flex.management.runtime.messaging.services.messaging.ThrottleManagerControl;
-import flex.messaging.MessageException;
-import flex.messaging.config.ConfigurationException;
-import flex.messaging.config.ThrottleSettings;
-import flex.messaging.config.ThrottleSettings.Policy;
-import flex.messaging.log.Log;
-import flex.messaging.log.LogCategories;
-import flex.messaging.messages.Message;
-import flex.messaging.services.messaging.ThrottleManager.ThrottleResult.Result;
-
-/**
- *
- *
- * The ThrottleManager provides functionality to limit the frequency of 
messages
- * routed through the system in message/second terms. Message frequency can be 
managed
- * on a per-client basis and also on a per-destination basis by tweaking 
different
- * parameters. Each MessageDestination has one ThrottleManager.
- *
- * Message frequency can be throttled differently for incoming messages, which 
are messages
- * published by Flash/Flex producers, and for outgoing messages, which are 
messages
- * consumed by Flash/Flex subscribers that may have been produced by either 
Flash clients
- * or external message producers (such as data feeds, JMS publishers, etc).
- *
- */
-public class ThrottleManager extends ManageableComponent
-{
-    
//--------------------------------------------------------------------------
-    //
-    // Public Static Constants
-    //
-    
//--------------------------------------------------------------------------
-
-    public static final String LOG_CATEGORY = LogCategories.TRANSPORT_THROTTLE;
-    public static final String TYPE = "ThrottleManager";
-
-    
//--------------------------------------------------------------------------
-    //
-    // Private Static Constants
-    //
-    
//--------------------------------------------------------------------------
-
-    private static final Object classMutex = new Object();
-
-    
//--------------------------------------------------------------------------
-    //
-    // Private Static Variables
-    //
-    
//--------------------------------------------------------------------------
-
-    private static int instanceCount = 0;
-
-    
//--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Constructs an unmanaged <code>ThrottleManager</code> instance.
-     */
-    public ThrottleManager()
-    {
-        this(false);
-    }
-
-    /**
-     * Constructs a <code>ThrottleManager</code> with the indicated management.
-     *
-     * @param enableManagement <code>true</code> if the 
<code>ThrottleManager</code>
-     * is manageable; otherwise <code>false</code>.
-     */
-    public ThrottleManager(boolean enableManagement)
-    {
-        super(enableManagement);
-        synchronized (classMutex)
-        {
-            super.setId(TYPE + ++instanceCount);
-        }
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Variables
-    //
-    
//--------------------------------------------------------------------------
-
-    protected ThrottleSettings settings;
-    private Map<String, MessageFrequency> inboundClientMarks;
-    private MessageFrequency inboundDestinationMark;
-    private MessageFrequency outboundDestinationMark;
-
-    
//--------------------------------------------------------------------------
-    //
-    // Initialize, validate, start, and stop methods.
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Starts the throttle manager.
-     */
-    @Override
-    public void start()
-    {
-        // Use the default ThrottleSettings if one is not set already.
-        if (settings == null)
-            settings = new ThrottleSettings();
-
-        if (settings.isDestinationThrottleEnabled())
-        {
-            inboundDestinationMark = new 
MessageFrequency(settings.getIncomingDestinationFrequency());
-            outboundDestinationMark = new 
MessageFrequency(settings.getOutgoingDestinationFrequency());
-        }
-
-        if (settings.isInboundClientThrottleEnabled())
-            inboundClientMarks = new HashMap<String, MessageFrequency>();
-    }
-
-
-    /**
-     * Stops the throttle manager.
-     */
-    @Override
-    public void stop()
-    {
-        super.stop();
-
-        // Remove management.
-        if (isManaged() && getControl() != null)
-        {
-            getControl().unregister();
-            setControl(null);
-            setManaged(false);
-        }
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Public Methods
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Given a policy, returns the result for that policy.
-     *
-     * @param policy The policy.
-     * @return The result for the policy.
-     */
-    public static Result getResult(Policy policy)
-    {
-        if (Policy.IGNORE == policy)
-            return Result.IGNORE;
-        else if (Policy.ERROR == policy)
-            return Result.ERROR;
-        else if (Policy.BUFFER == policy)
-            return Result.BUFFER;
-        else if (Policy.CONFLATE == policy)
-            return Result.CONFLATE;
-        return Result.OK;
-    }
-
-    /**
-     * Returns the outbound policy being used by the throttle manager.
-     *
-     * @return The outbound policy for the throttle manager.
-     */
-    public Policy getOutboundPolicy()
-    {
-        return settings == null? null : settings.getOutboundPolicy();
-    }
-
-
-    /**
-     * This is a no-op because throttle manager's id is generated internally.
-     *
-     * @param id The id.
-     */
-    @Override
-    public void setId(String id)
-    {
-        // No-op
-    }
-
-    /**
-     *
-     * Used by the MessageClient in its cleanup process.
-     *
-     * @param clientId The id of the MessageClient.
-     */
-    public void removeClientThrottleMark(Object clientId)
-    {
-        if (inboundClientMarks != null)
-            inboundClientMarks.remove(clientId);
-        // Note that the outBoundClientMarks that is maintained by the 
FlexClientOutboundQueueProcessor
-        // is cleaned up by FlexClient when MessageClient is unregistered with 
it.
-    }
-
-    /**
-     * Sets the throttling settings of the throttle manager.
-     *
-     * @param throttleSettings The throttling settings for the throttle 
manager.
-     */
-    public void setThrottleSettings(ThrottleSettings throttleSettings)
-    {
-        // Make sure that we have valid outbound policies.
-        Policy outPolicy = throttleSettings.getOutboundPolicy();
-        if (outPolicy != Policy.NONE && outPolicy != Policy.IGNORE)
-        {
-            ConfigurationException ex = new ConfigurationException();
-            ex.setMessage("Invalid outbound throttle policy '" + outPolicy
-                    + "' for destination '" + 
throttleSettings.getDestinationName()
-                    + "'. Valid values are 'NONE' and 'IGNORE'.");
-            throw ex;
-        }
-        settings = throttleSettings;
-    }
-
-    /**
-     * Attempts to throttle the incoming message at the destination and the 
client level.
-     *
-     * @param message Message to be throttled.
-     * @return True if the message was throttled; otherwise false.
-     */
-    public boolean throttleIncomingMessage(Message message)
-    {
-        // destination-level throttling comes before client-level, because if 
it
-        // fails then it doesn't matter what the client-level throttle reports.
-        ThrottleResult throttleResult = throttleDestinationLevel(message, 
true);
-        if (throttleResult.getResult() == Result.OK)
-        {
-            // client-level throttling allows the system to further refine a
-            // different throttle for individual clients, which may be a subset
-            // but never a superset of destination-level throttle settings
-            throttleResult = throttleIncomingClientLevel(message);
-            handleIncomingThrottleResult(message, throttleResult, true 
/*isClientLevel*/);
-            boolean throttled = throttleResult.getResult() != Result.OK;
-            if (!throttled)
-            {
-                updateMessageFrequencyDestinationLevel(true /* incoming */);
-                updateMessageFrequencyIncomingClientLevel(message);
-            }
-            return throttled;
-        }
-
-        handleIncomingThrottleResult(message, throttleResult, false 
/*isClientLevel*/);
-        boolean throttled = throttleResult.getResult() != Result.OK;
-        if (!throttled)
-        {
-            updateMessageFrequencyDestinationLevel(true /* incoming */);
-            updateMessageFrequencyIncomingClientLevel(message);
-        }
-        return throttled;
-    }
-
-    /**
-     * Attempts to throttle the outgoing message at the destination level only.
-     * Client level throttling is enforced at FlexClientOutboundQueueProcessor.
-     *
-     * @param message The message to be throttled.
-     * @return The result of throttling attempt.
-     */
-    public ThrottleResult throttleOutgoingMessage(Message message)
-    {
-        ThrottleResult throttleResult = throttleDestinationLevel(message, 
false);
-        // Outbound client-level throttling happens in 
FlexClientOutboundQueueProcessor.
-        handleOutgoingThrottleResult(message, throttleResult, false 
/*isClientLevel*/);
-        return throttleResult;
-    }
-
-    /**
-     * A utility method to handle outgoing throttling results in a common way.
-     *
-     * @param message The message that is being throttled.
-     * @param throttleResult The throttling result.
-     * @param isClientLevel Whether the message is being throttled at the 
client level
-     * or not.
-     */
-    public void handleOutgoingThrottleResult(Message message, ThrottleResult 
throttleResult, boolean isClientLevel)
-    {
-        Result result = throttleResult.getResult();
-
-        // Update the management metrics.
-        if (result != Result.OK && isManaged())
-        {
-            if (isClientLevel)
-                
((ThrottleManagerControl)getControl()).incrementClientOutgoingMessageThrottleCount();
-            else
-                
((ThrottleManagerControl)getControl()).incrementDestinationOutgoingMessageThrottleCount();
-        }
-
-        // Result can only be IGNORE (or NONE which means no throttling)
-        if (result == Result.IGNORE)
-        {
-            // Improve the detail message for IGNORE.
-            if (isClientLevel)
-                throttleResult.setDetail("Message '" + message.getMessageId() 
+ "' ignored: Too many messages sent to client '"
-                        + message.getClientId() + "' in too small of a time 
interval " + throttleResult.getDetail());
-            else
-                throttleResult.setDetail("Message '" + message.getMessageId() 
+ "' throttled: Too many messages routed by destination '"
-                        + message.getDestination() + "' in too small of a time 
interval " + throttleResult.getDetail());
-
-            if (Log.isInfo())
-                Log.getLogger(LOG_CATEGORY).info(throttleResult.getDetail());
-        }
-    }
-
-    /**
-     * Attempts to throttle destination-level incoming and outgoing messages.
-     *
-     * @param message Message to throttle.
-     * @param incoming Whether the message is incoming or outgoing.
-     * @return The result of the throttling attempt.
-     */
-    public ThrottleResult throttleDestinationLevel(Message message, boolean 
incoming)
-    {
-        if (incoming && settings.isInboundDestinationThrottleEnabled())
-        {
-            ThrottleResult result = 
inboundDestinationMark.checkLimit(settings.getIncomingDestinationFrequency(), 
settings.getInboundPolicy());
-            return result;
-        }
-        else if (!incoming && settings.isOutboundDestinationThrottleEnabled())
-        {
-            ThrottleResult result = 
outboundDestinationMark.checkLimit(settings.getOutgoingDestinationFrequency(), 
settings.getOutboundPolicy());
-            return result;
-        }
-        // Return the default OK result.
-        return new ThrottleResult();
-    }
-
-    /**
-     * Updates the destination level message frequency.
-     *
-     * param incoming Whether the message is incoming or outgoing.
-     */
-    public void updateMessageFrequencyDestinationLevel(boolean incoming)
-    {
-        if (incoming && settings.isInboundDestinationThrottleEnabled())
-            inboundDestinationMark.updateMessageFrequency();
-        else if (!incoming && settings.isOutboundDestinationThrottleEnabled())
-            outboundDestinationMark.updateMessageFrequency();
-    }
-
-    /**
-     * Updates the incoming client level message frequency.
-     */
-    public void updateMessageFrequencyIncomingClientLevel(Message message)
-    {
-        String clientId = (String)message.getClientId();
-        if (settings.isInboundClientThrottleEnabled())
-        {
-            MessageFrequency clientLevelMark = 
inboundClientMarks.get(clientId);
-            if (clientLevelMark != null)
-                clientLevelMark.updateMessageFrequency();
-        }
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Protected and private methods.
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Returns the log category for the throttle manager.
-     */
-    @Override
-    protected String getLogCategory()
-    {
-        return LOG_CATEGORY;
-    }
-
-    /**
-     * A utility method to handle incoming throttling results in a common way.
-     *
-     * @param message The message that is being throttled.
-     * @param throttleResult The throttling result.
-     * @param isClientLevel Whether the message is being throttled at the 
client level
-     * or not.
-     */
-    protected void handleIncomingThrottleResult(Message message, 
ThrottleResult throttleResult, boolean isClientLevel)
-    {
-        Result result = throttleResult.getResult();
-
-        // Update the management metrics.
-        if (result != Result.OK && isManaged())
-        {
-            if (isClientLevel)
-                
((ThrottleManagerControl)getControl()).incrementClientIncomingMessageThrottleCount();
-            else
-                
((ThrottleManagerControl)getControl()).incrementDestinationIncomingMessageThrottleCount();
-        }
-
-        // Result can be IGNORE or ERROR (or NONE which means no throttling).
-        if (result == Result.IGNORE || result == Result.ERROR)
-        {
-            if (isClientLevel)
-                throttleResult.setDetail("Message '" + message.getMessageId() 
+ "' throttled: Too many messages sent by the client '"
-                        + message.getClientId() + "' in too small of a time 
interval " + throttleResult.getDetail());
-            else
-                throttleResult.setDetail("Message '" + message.getMessageId() 
+ "' throttled: Too many messages sent to destination '"
-                    + message.getDestination() + "' in too small of a time 
interval " + throttleResult.getDetail());
-
-            String detail = throttleResult.getDetail();
-            if (result == Result.ERROR)
-            {
-                if (Log.isError())
-                    Log.getLogger(LOG_CATEGORY).error(detail);
-                // And, throw an exception, so the client gets the error.
-                MessageException me = new MessageException(detail);
-                throw me;
-            }
-            // Otherwise, just log it.
-            if (Log.isInfo())
-                Log.getLogger(LOG_CATEGORY).info(detail);
-        }
-    }
-
-    /**
-     * Attempts to throttle client-level incoming messages only. Client-level
-     * outgoing messages are throttled at the FlexClientOutboundQueueProcessor.
-     *
-     * @param message Message to throttle.
-     * @return The result of the throttling attempt.
-     */
-    protected ThrottleResult throttleIncomingClientLevel(Message message)
-    {
-        String clientId = (String)message.getClientId();
-        if (settings.isInboundClientThrottleEnabled())
-        {
-            MessageFrequency clientLevelMark;
-            clientLevelMark = inboundClientMarks.get(clientId);
-            if (clientLevelMark == null)
-                clientLevelMark = new 
MessageFrequency(settings.getIncomingClientFrequency());
-
-            ThrottleResult result = 
clientLevelMark.checkLimit(settings.getIncomingClientFrequency(), 
settings.getInboundPolicy());
-            inboundClientMarks.put(clientId, clientLevelMark);
-            return result;
-        }
-        // Return the default OK result.
-        return new ThrottleResult();
-    }
-
-    
//--------------------------------------------------------------------------
-    //
-    // Nested Classes
-    //
-    
//--------------------------------------------------------------------------
-
-    /**
-     * This class is used to keep track of throttling results.
-     */
-    public static class ThrottleResult
-    {
-        /**
-         * Result enum.
-         */
-        public enum Result
-        {
-            OK, IGNORE, ERROR, BUFFER, CONFLATE
-        }
-
-        private String detail;
-        private Result result;
-
-        /**
-         * Creates a ThrottleResult with Result.OK.
-         */
-        public ThrottleResult()
-        {
-            this(Result.OK);
-        }
-
-        /**
-         * Creates a ThrottleResult with the passed in Result.
-         *
-         * @param result The Result.
-         */
-        public ThrottleResult(Result result) // FIXME
-        {
-            this.result = result;
-        }
-
-        /**
-         * Creates a ThrottleResult with the passed in Result and detail.
-         *
-         * @param result The Result.
-         * @param detail The detail.
-         */
-        public ThrottleResult(Result result, String detail) // FIXME
-        {
-            this(result);
-            this.detail = detail;
-        }
-
-        /**
-         * Returns the detail.
-         *
-         * @return The detail.
-         */
-        public String getDetail()
-        {
-            return detail;
-        }
-
-        /**
-         * Sets the detail.
-         *
-         * @param detail The detail.
-         */
-        public void setDetail(String detail)
-        {
-            this.detail = detail;
-        }
-
-        /**
-         * Returns the result.
-         *
-         * @return The result.
-         */
-        public Result getResult()
-        {
-            return result;
-        }
-
-        /**
-         * Sets the result.
-         *
-         * @param result The result.
-         */
-        public void setResult(Result result)
-        {
-            this.result = result;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/ActionScriptAdapter.java
----------------------------------------------------------------------
diff --git 
a/core/src/flex/messaging/services/messaging/adapters/ActionScriptAdapter.java 
b/core/src/flex/messaging/services/messaging/adapters/ActionScriptAdapter.java
deleted file mode 100644
index fb00d0c..0000000
--- 
a/core/src/flex/messaging/services/messaging/adapters/ActionScriptAdapter.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-import java.util.Set;
-
-import flex.messaging.services.MessageService;
-import flex.messaging.services.messaging.SubscriptionManager;
-import flex.messaging.messages.Message;
-import flex.messaging.Destination;
-import flex.messaging.MessageDestination;
-import 
flex.management.runtime.messaging.services.messaging.adapters.ActionScriptAdapterControl;
-
-/**
- * An ActionScript object based adapter for the MessageService
- * that supports simple publish/subscribe messaging between
- * ActionScript based clients.
- */
-public class ActionScriptAdapter extends MessagingAdapter
-{
-    private ActionScriptAdapterControl controller;
-
-    
//--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    
//--------------------------------------------------------------------------
-    
-    /**
-     * Constructs a default <code>ActionScriptAdapter</code>.
-     */
-    public ActionScriptAdapter()
-    {
-        super();
-    }
-    
-    
//--------------------------------------------------------------------------
-    //
-    // Public Getters and Setters for ServiceAdapter properties
-    //                             
-    
//--------------------------------------------------------------------------
-
-    /**
-     * Casts the <code>Destination</code> into <code>MessageDestination</code>
-     * and calls super.setDestination.
-     * 
-     * @param destination
-     */
-    public void setDestination(Destination destination)
-    {
-        Destination dest = (MessageDestination)destination;
-        super.setDestination(dest);
-    }
-    
-    
//--------------------------------------------------------------------------
-    //
-    // Other Public APIs
-    //                 
-    
//--------------------------------------------------------------------------
-    
-    /**
-     * Handle a data message intended for this adapter.
-     */
-    public Object invoke(Message message)
-    {
-        MessageDestination destination = (MessageDestination)getDestination();
-        MessageService msgService = (MessageService)destination.getService();
-
-        SubscriptionManager subscriptionManager = 
destination.getSubscriptionManager();
-        Set subscriberIds = subscriptionManager.getSubscriberIds(message, true 
/*evalSelector*/);
-        if (subscriberIds != null && !subscriberIds.isEmpty())
-        {
-            /* We have already filtered based on the selector and so pass 
false below */
-            msgService.pushMessageToClients(destination, subscriberIds, 
message, false);
-        }
-        msgService.sendPushMessageFromPeer(message, destination, true);
-
-        return null;
-    }
-
-    /**
-     * Invoked automatically to allow the <code>ActionScriptAdapter</code> to 
setup its corresponding
-     * MBean control.
-     * 
-     * @param broker The <code>Destination</code> that manages this 
<code>ActionScriptAdapter</code>.
-     */
-    protected void setupAdapterControl(Destination destination)
-    {
-        controller = new ActionScriptAdapterControl(this, 
destination.getControl());
-        controller.register();
-        setControl(controller);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/AsyncMessageReceiver.java
----------------------------------------------------------------------
diff --git 
a/core/src/flex/messaging/services/messaging/adapters/AsyncMessageReceiver.java 
b/core/src/flex/messaging/services/messaging/adapters/AsyncMessageReceiver.java
deleted file mode 100644
index fb1c0f4..0000000
--- 
a/core/src/flex/messaging/services/messaging/adapters/AsyncMessageReceiver.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-/**
- * A <code>MessageReceiver</code> that receives messages asynchronously from 
JMS.
- *
- *
- */
-class AsyncMessageReceiver implements MessageReceiver, ExceptionListener, 
MessageListener
-{
-    private JMSConsumer jmsConsumer;
-
-    /**
-     * Constructs a new AsyncMessageReceiver.
-     *
-     * @param jmsConsumer JMSConsumer associated with the AsyncMessageReceiver.
-     */
-    public AsyncMessageReceiver(JMSConsumer jmsConsumer)
-    {
-        this.jmsConsumer = jmsConsumer;
-    }
-
-    /**
-     * Implements MessageReceiver.startReceive.
-     */
-    public void startReceive() throws JMSException
-    {
-        jmsConsumer.setMessageListener(this);
-    }
-
-    /**
-     * Implements MessageReceiver.stopReceive.
-     */
-    public void stopReceive()
-    {
-        // Nothing to do.
-    }
-
-    /**
-     * Implements javax.jms.ExceptionListener.onException.
-     *
-     * @param exception JMS exception received from the JMS server.
-     */
-    public void onException(JMSException exception)
-    {
-        jmsConsumer.onException(exception);
-    }
-
-    /**
-     * Implements javax.jms.MessageListener.onMessage.
-     *
-     * @param message JMS message received from the JMS server.
-     */
-    public void onMessage(Message message)
-    {
-        jmsConsumer.onMessage(message);
-    }
-}

Reply via email to