http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/RemoteSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/RemoteSubscriptionManager.java b/core/src/flex/messaging/services/messaging/RemoteSubscriptionManager.java deleted file mode 100644 index c1a3096..0000000 --- a/core/src/flex/messaging/services/messaging/RemoteSubscriptionManager.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging; - -import flex.messaging.MessageDestination; -import flex.messaging.MessageClient; -import flex.messaging.services.MessageService; -import flex.messaging.log.Log; -import flex.messaging.util.StringUtils; -import flex.messaging.cluster.RemoveNodeListener; - -import java.util.List; - -/** - * The RemoteSubscriptionManager monitors subscriptions from other - * servers, not other clients. One MessageClient instance is used for - * each remote server. It clientId is the address of the remote server. - * Using a separate instance of this class keeps the subscriptions - * of local clients separate from remote clients. - * - * - */ -public class RemoteSubscriptionManager extends SubscriptionManager implements RemoveNodeListener -{ - private Object syncLock = new Object(); - - /* - * A monitor lock used for synchronizing the attempt to request subscriptions - * across the cluster during startup. - */ - private static final Object initRemoteSubscriptionsLock = new Object(); - - public RemoteSubscriptionManager(MessageDestination destination) - { - this(destination, false); - } - - public RemoteSubscriptionManager(MessageDestination destination, boolean enableManagement) - { - super(destination, enableManagement); - } - - public void setSessionTimeout(long sessionConfigValue) - { - } - - public long getSessionTimeout() - { - return 0; // not used for remote subscriptions - } - - public void addSubscriber(String flexClientId, Object clientId, String selector, String subtopic) - { - synchronized (syncLock) - { - /* - * Only process subscriptions for servers whose subscription state we have received - * We may receive a subscribe/unsubscribe from a peer before we get their - * subscription state... we ignore these since they will be included in the - * state we receive later - */ - if (allSubscriptions.get(clientId) != null) - super.addSubscriber(clientId, selector, subtopic, null); - else if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Ignoring new remote subscription for server: " + clientId + " whose subscription state we have not yet received. selector: " + selector + " subtopic: " + subtopic); - } - } - - public void removeSubscriber(String flexClientId, Object clientId, String selector, String subtopic, String endpoint) - { - synchronized (syncLock) - { - /* Only process subscriptions for servers whose subscription state we have received */ - if (allSubscriptions.get(clientId) != null) - super.removeSubscriber(clientId, selector, subtopic, null); - } - } - - protected void sendSubscriptionToPeer(boolean subscribe, String selector, String subtopic) - { - // Don't do this for remote subscriptions - } - - protected MessageClient newMessageClient(Object clientId, String endpointId) - { - return new RemoteMessageClient(clientId, destination, endpointId); - } - - /** - * Takes the selector and subtopic list from this address and - * for each one create a RemoteSubscription which gets - * registered in this table. We also register the remote - * subscription with a "per server" index so we can easily - * remove them later on. - * @param state the subscription state object - * @param address the remote cluster node address - */ - public void setSubscriptionState(Object state, Object address) - { - MessageClient client = newMessageClient(address, null); - - if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Received subscription state for destination: " + destination.getId() + " from server: " + address + StringUtils.NEWLINE + state); - - /* - * need to be sure we do not accept any remote sub/unsub messages - * from a given server until we have received its subscription state - * - * Also, we need to ensure we do not process any remote subscribe/unsubs - * from a remote server until we finish this list. - */ - synchronized (syncLock) - { - allSubscriptions.put(address, client); - - List list = (List) state; - - for (int i = 0; i < list.size(); i+=2) - { - addSubscriber(null, address, (String) list.get(i), (String) list.get(i+1)); - } - } - synchronized (initRemoteSubscriptionsLock) - { - initRemoteSubscriptionsLock.notifyAll(); - } - } - - /** - * This method waits for some time for the receipt of the subscription - * state for the server with the given address. If we fail to receive - * a message after waiting for the 5 seconds, a warning is printed. - * @param addr the remote cluster node address - */ - public void waitForSubscriptions(Object addr) - { - /* If we have not gotten the response yet from this client... */ - if (getSubscriber(addr) == null) - { - synchronized (initRemoteSubscriptionsLock) - { - try - { - if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Waiting for subscriptions from cluster node: " + addr + " for destination: " + destination.getId()); - - initRemoteSubscriptionsLock.wait(5000); - - if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Done waiting for subscriptions from cluster node: " + addr + " for destination: " + destination.getId()); - } - catch (InterruptedException exc) {} - } - if (getSubscriber(addr) == null && Log.isWarn()) - Log.getLogger(MessageService.LOG_CATEGORY).warn("No response yet from request subscriptions request for server: " + addr + " for destination: " + destination.getId()); - } - else if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Already have subscriptions from server: " + addr + " for destination: " + destination.getId()); - } - - /** - * Called when a cluster node gets removed. We need to make sure that all subscriptions - * for this node are removed. - * @param address the remote cluster node address - */ - public void removeClusterNode(Object address) - { - if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Cluster node: " + address + " subscriptions being removed for destination:" + destination.getId() + " before: " + StringUtils.NEWLINE + getDebugSubscriptionState()); - - MessageClient client = getSubscriber(address); - if (client != null) - { - client.invalidate(); - } - - if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Cluster node: " + address + " subscriptions being removed for destination:" + destination.getId() + " after: " + StringUtils.NEWLINE + getDebugSubscriptionState()); - } - - protected void monitorTimeout(MessageClient client) - { - // Remote subscriptions do not timeout - } - -}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/SubscriptionManager.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/SubscriptionManager.java b/core/src/flex/messaging/services/messaging/SubscriptionManager.java deleted file mode 100644 index 9e53cf1..0000000 --- a/core/src/flex/messaging/services/messaging/SubscriptionManager.java +++ /dev/null @@ -1,915 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging; - -import flex.management.ManageableComponent; -import flex.messaging.FlexContext; -import flex.messaging.MessageClient; -import flex.messaging.MessageDestination; -import flex.messaging.MessageException; -import flex.messaging.client.FlexClient; -import flex.messaging.config.ServerSettings.RoutingMode; -import flex.messaging.log.Log; -import flex.messaging.messages.AsyncMessage; -import flex.messaging.messages.Message; -import flex.messaging.security.MessagingSecurity; -import flex.messaging.services.MessageService; -import flex.messaging.services.ServiceAdapter; -import flex.messaging.services.ServiceException; -import flex.messaging.services.messaging.selector.JMSSelector; -import flex.messaging.services.messaging.selector.JMSSelectorException; -import flex.messaging.util.StringUtils; -import flex.messaging.util.TimeoutManager; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadFactory; - -/** - * - * The SubscriptionManager monitors subscribed clients for MessageService - * and its subclasses, such as DataService. - */ -public class SubscriptionManager extends ManageableComponent -{ - public static final String TYPE = "SubscriptionManager"; - private static final int SUBTOPICS_NOT_SUPPORTED = 10553; - private static final int WILDCARD_SUBTOPICS_NOT_ALLOWED = 10560; - private static final Object classMutex = new Object(); - private static int instanceCount = 0; - - /** - * clientId to MessageClient Map for any subscriber. Note that clientId is tracked as - * Object instead of String because in clustering, clientId is not guaranteed to be String. - */ - protected final Map<Object, MessageClient> allSubscriptions = new ConcurrentHashMap<Object, MessageClient>(); - // This lock protects allSubscriptions as synchronizing on a Concurrent class does not work. - private final Object allSubscriptionsLock = new Object(); - - /** Subscriptions with no subtopic. */ - private final TopicSubscription globalSubscribers = new TopicSubscription(); - - /** Subscriptions with a simple subtopic. */ - private final Map<Subtopic, TopicSubscription> subscribersPerSubtopic = new ConcurrentHashMap<Subtopic, TopicSubscription>(); - - /** Subscriptions with a wildcard subtopic. */ - private final Map<Subtopic, TopicSubscription> subscribersPerSubtopicWildcard = new ConcurrentHashMap<Subtopic, TopicSubscription>(); - - protected final MessageDestination destination; - // We can either timeout subscriptions by session expiration (idleSubscriptionTimeout=0) or by an explicit - // timeout. If we time them out by timeout, this refers to the TimeoutManager - // we use to monitor session timeouts. - private TimeoutManager subscriberSessionManager; - private long subscriptionTimeoutMillis; - - /** - * Construct a subscription manager for a destination. - * - * @param destination the destination - */ - public SubscriptionManager(MessageDestination destination) - { - this(destination, false); - } - - /** - * Construct a subscription manager for a destination. - * - * @param destination the destination - * @param enableManagement turn on management? - */ - public SubscriptionManager(MessageDestination destination, boolean enableManagement) - { - super(enableManagement); - synchronized (classMutex) - { - super.setId(TYPE + ++instanceCount); - } - this.destination = destination; - - subscriptionTimeoutMillis = 0; - } - - // This component's id should never be changed as it's generated internally - /** {@inheritDoc} */ - @Override public void setId(String id) - { - // No-op - } - - /** - * Stops the subscription manager. - */ - @Override public void stop() - { - super.stop(); - - // Remove management. - if (isManaged() && getControl() != null) - { - getControl().unregister(); - setControl(null); - setManaged(false); - } - - // Destroy subscriptions - synchronized (this) - { - if (subscriberSessionManager != null) - { - subscriberSessionManager.shutdown(); - subscriberSessionManager = null; - } - } - - synchronized (allSubscriptionsLock) - { - if (!allSubscriptions.isEmpty()) - { - for (Map.Entry<Object, MessageClient> objectMessageClientEntry : allSubscriptions.entrySet()) - removeSubscriber(objectMessageClientEntry.getValue()); - } - } - } - - /** - * Set the timeout value. Creates a timeout manager thread if needed. - * - * @param value the timeout value in milliseconds - */ - public void setSubscriptionTimeoutMillis(long value) - { - subscriptionTimeoutMillis = value; - if (subscriptionTimeoutMillis > 0) - { - subscriberSessionManager = new TimeoutManager(new ThreadFactory() - { - int counter = 1; - public synchronized Thread newThread(Runnable runnable) - { - Thread t = new Thread(runnable); - t.setName(destination.getId() + "-SubscriptionTimeoutThread-" + counter++); - return t; - } - }); - } - } - - /** - * Returns the subscription timeout. - * - * @return the timeout in milliseconds - */ - public long getSubscriptionTimeoutMillis() - { - return subscriptionTimeoutMillis; - } - - /** - * Implement a serializer instance which wraps the subscription - * manager in a transient variable. It will need to block out - * all sub/unsub messages before they are broadcast to the - * remote server, iterate through the maps of subscriptions and - * for each "unique" subscription it writes the selector and - * subtopic. - * - * synchronization note: this assumes no add/remove subscriptions - * are occurring while this method is called. - * - * @return a List of subscriptions selectors and subtopics - */ - public Object getSubscriptionState() - { - ArrayList<String> subState = new ArrayList<String>(); - - if (globalSubscribers.defaultSubscriptions != null && - !globalSubscribers.defaultSubscriptions.isEmpty()) - { - subState.add(null); // selector string - subState.add(null); // subtopic string - } - if (globalSubscribers.selectorSubscriptions != null) - { - for (String s : globalSubscribers.selectorSubscriptions.keySet()) - { - subState.add(s); - subState.add(null); // subtopic - } - } - addSubscriptionState(subState, subscribersPerSubtopic); - addSubscriptionState(subState, subscribersPerSubtopicWildcard); - - if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Retrieved subscription state to send to new cluster member for destination: " + destination.getId() + ": " + StringUtils.NEWLINE + subState); - - return subState; - } - - private void addSubscriptionState(List<String> subState, Map<Subtopic, TopicSubscription> subsPerSubtopic) - { - for (Map.Entry<Subtopic, TopicSubscription> entry : subsPerSubtopic.entrySet()) - { - Subtopic subtopic = entry.getKey(); - TopicSubscription tc = entry.getValue(); - - if (tc.defaultSubscriptions != null && !tc.defaultSubscriptions.isEmpty()) - { - subState.add(null); - subState.add(subtopic.toString()); - } - if (tc.selectorSubscriptions != null) - { - for (String s : tc.selectorSubscriptions.keySet()) - { - subState.add(s); - subState.add(subtopic.toString()); // subtopic - } - } - } - - } - - /** - * Get a string representation of the subscription state. - * @return the string - */ - protected String getDebugSubscriptionState() - { - StringBuffer sb = new StringBuffer(100); - - sb.append(" global subscriptions: ").append(globalSubscribers).append(StringUtils.NEWLINE); - sb.append(" regular subtopic subscriptions: ").append(subscribersPerSubtopic).append(StringUtils.NEWLINE); - sb.append(" wildcard subtopic subscriptions: ").append(subscribersPerSubtopicWildcard).append(StringUtils.NEWLINE); - return sb.toString(); - } - - @Override protected String getLogCategory() - { - return MessageService.LOG_CATEGORY; - } - - /** - * Return the ids of our subscribers. - * @return a set of subscriber ids - */ - public Set<Object> getSubscriberIds() - { - return allSubscriptions.keySet(); - } - - /** - * Return the set of subscribers for a message. - * - * @param message the message - * @param evalSelector should the selector be evaluated? - * @return the set of subscribers - */ - public Set<Object> getSubscriberIds(Message message, boolean evalSelector) - { - Set<Object> ids = new LinkedHashSet<Object>(); - - Object subtopicObj = message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME); - - if (subtopicObj instanceof Object[]) - subtopicObj = Arrays.asList((Object[])subtopicObj); - - if (subtopicObj instanceof String) - { - String subtopicString = (String) subtopicObj; - - if (subtopicString.length() > 0) - addSubtopicSubscribers(subtopicString, message, ids, evalSelector); - else - addTopicSubscribers(globalSubscribers, message, ids, evalSelector); - } - else if (subtopicObj instanceof List) - { - @SuppressWarnings("unchecked") - List<String> subtopicList = (List<String>)subtopicObj; - for (String aSubtopicList : subtopicList) - { - addSubtopicSubscribers(aSubtopicList, message, ids, evalSelector); - } - } - else - addTopicSubscribers(globalSubscribers, message, ids, evalSelector); - - return ids; - } - - /** - * Return the set of subscribers for a message. - * - * @param message the message - * @param evalSelector hould the selector be evaluated? - * @param subtopics the subtopics to use - * @return the set of subscribers - */ - public Set<Object> getSubscriberIds(Message message, boolean evalSelector, List<Subtopic> subtopics) - { - Set<Object> ids = new LinkedHashSet<Object>(); - - if (subtopics == null || subtopics.isEmpty()) - { - addTopicSubscribers(globalSubscribers, message, ids, evalSelector); - } - else - { - for (Subtopic subtopic : subtopics) - { - addSubtopicSubscribers(subtopic, message, ids, evalSelector); - } - } - return ids; - } - - /** - * Return the set of subscribers for a subtopic pattern. - * Constructs a message and calls {@link #getSubscriberIds(flex.messaging.messages.Message, boolean)}. - * - * @param subtopicPattern the pattern to match - * @param messageHeaders the message headers - * @return the set of subscribers - */ - public Set<Object> getSubscriberIds(String subtopicPattern, Map messageHeaders) - { - // This could be more efficient but we'd have to change the SQLParser to accept a map. - Message msg = new AsyncMessage(); - msg.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subtopicPattern); - if (messageHeaders != null) - msg.setHeaders(messageHeaders); - return getSubscriberIds(msg, true); - } - - void addSubtopicSubscribers(String subtopicString, Message message, Set<Object> ids, boolean evalSelector) - { - Subtopic subtopic = getSubtopic(subtopicString); - addSubtopicSubscribers(subtopic, message, ids, evalSelector); - } - - void addSubtopicSubscribers(Subtopic subtopic, Message message, Set<Object> ids, boolean evalSelector) - { - // If we have a subtopic, we need to route the message only to that - // subset of subscribers. - if (!destination.getServerSettings().getAllowSubtopics()) - { - // Throw an error - the destination doesn't allow subtopics. - ServiceException se = new ServiceException(); - se.setMessage(SUBTOPICS_NOT_SUPPORTED, new Object[] {subtopic.getValue(), destination.getId()}); - throw se; - } - - // Give a MessagingAdapter a chance to block the send to this subtopic. - ServiceAdapter adapter = destination.getAdapter(); - if (adapter instanceof MessagingSecurity) - { - if (!((MessagingSecurity)adapter).allowSend(subtopic)) - { - ServiceException se = new ServiceException(); - se.setMessage(10558, new Object[] {subtopic.getValue()}); - throw se; - } - } - - TopicSubscription ts; - if (subscribersPerSubtopic.containsKey(subtopic)) - { - ts = subscribersPerSubtopic.get(subtopic); - addTopicSubscribers(ts, message, ids, evalSelector); - } - - /* - * TODO: performance - organize these into a tree so we can find consumers via - * a hashtable lookup rather than a linear search - */ - Set<Subtopic> subtopics = subscribersPerSubtopicWildcard.keySet(); - if (!subtopics.isEmpty()) - { - for (Subtopic st : subtopics) - { - if (st.matches(subtopic)) - { - ts = subscribersPerSubtopicWildcard.get(st); - addTopicSubscribers(ts, message, ids, evalSelector); - } - } - } - } - - void addTopicSubscribers(TopicSubscription ts, Message message, Set<Object> ids, boolean evalSelector) - { - if (ts == null) - return; - - Map<Object, MessageClient> subs = ts.defaultSubscriptions; - if (subs != null) - ids.addAll(subs.keySet()); - - if (ts.selectorSubscriptions == null) - return; - - for (Map.Entry<String, Map<Object, MessageClient>> entry : ts.selectorSubscriptions.entrySet()) - { - String selector = entry.getKey(); - subs = entry.getValue(); - - if (!evalSelector) - { - ids.addAll(subs.keySet()); - } - else - { - JMSSelector jmsSel = new JMSSelector(selector); - try - { - if (jmsSel.match(message)) - { - ids.addAll(subs.keySet()); - } - } - catch (JMSSelectorException jmse) - { - if (Log.isWarn()) - { - Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " + - jmsSel + StringUtils.NEWLINE + - " incomingMessage: " + message + StringUtils.NEWLINE + - " selector: " + selector); - } - } - } - } - } - - /** - * Returns the requested subscriber. - * If the subscriber exists it is also registered for subscription timeout if necessary. - * If the subscriber is not found this method returns null. - * - * @param clientId The clientId of the target subscriber. - * @return The subscriber, or null if the subscriber is not found. - */ - public MessageClient getSubscriber(Object clientId) - { - MessageClient client = allSubscriptions.get(clientId); - if (client != null && !client.isTimingOut()) - monitorTimeout(client); - return client; - } - - /** - * Removes the subscriber, unsubscribing it from all current subscriptions. - * This is used by the admin UI. - * - * @param client the client - */ - public void removeSubscriber(MessageClient client) - { - // Sends unsub messages for each subscription for this MessageClient which - // should mean we remove the client at the end. - client.invalidate(); - - if (getSubscriber(client.getClientId()) != null) - Log.getLogger(MessageService.LOG_CATEGORY).error("Failed to remove client: " + client.getClientId()); - } - - /** - * Add a subscriber. - * - * @param clientId the client id - * @param selector the selector - * @param subtopicString the subtopic - * @param endpointId the endpoint - */ - public void addSubscriber(Object clientId, String selector, String subtopicString, String endpointId) - { - addSubscriber(clientId, selector, subtopicString, endpointId, 0); - } - - /** - * Add a subscriber. - * - * @param clientId the client id - * @param selector the selector - * @param subtopicString the subtopic - * @param endpointId the endpoint - * @param maxFrequency maximum frequency - */ - public void addSubscriber(Object clientId, String selector, String subtopicString, String endpointId, int maxFrequency) - { - Subtopic subtopic = getSubtopic(subtopicString); - MessageClient client = null; - TopicSubscription topicSub; - Map<Object, MessageClient> subs; - Map<Subtopic, TopicSubscription> map; - - try - { - // Handle resubscribes from the same client and duplicate subscribes from different clients - boolean subscriptionAlreadyExists = (getSubscriber(clientId) != null); - client = getMessageClient(clientId, endpointId); - - FlexClient flexClient = FlexContext.getFlexClient(); - if (subscriptionAlreadyExists) - { - // Block duplicate subscriptions from multiple FlexClients if they - // attempt to use the same clientId. (when this is called from a remote - // subscription, there won't be a flex client so skip this test). - if (flexClient != null && !flexClient.getId().equals(client.getFlexClient().getId())) - { - ServiceException se = new ServiceException(); - se.setMessage(10559, new Object[] {clientId}); - throw se; - } - - // It's a resubscribe. Reset the endpoint push state for the subscription to make sure its current - // because a resubscribe could be arriving over a new endpoint or a new session. - client.resetEndpoint(endpointId); - } - - ServiceAdapter adapter = destination.getAdapter(); - client.updateLastUse(); - - if (subtopic == null) - { - topicSub = globalSubscribers; - } - else - { - if (!destination.getServerSettings().getAllowSubtopics()) - { - // Throw an error - the destination doesn't allow subtopics. - ServiceException se = new ServiceException(); - se.setMessage(SUBTOPICS_NOT_SUPPORTED, new Object[] {subtopicString, destination.getId()}); - throw se; - } - - if (subtopic.containsSubtopicWildcard() && destination.getServerSettings().isDisallowWildcardSubtopics()) - { - // Attempt to subscribe to the subtopic, ''{0}'', on destination, ''{1}'', that does not allow wilcard subtopics failed. - ServiceException se = new ServiceException(); - se.setMessage(WILDCARD_SUBTOPICS_NOT_ALLOWED, new Object[] {subtopicString, destination.getId()}); - throw se; - } - - // Give a MessagingAdapter a chance to block the subscribe. - if ((adapter instanceof MessagingSecurity) && (subtopic != null)) - { - if (!((MessagingSecurity)adapter).allowSubscribe(subtopic)) - { - ServiceException se = new ServiceException(); - se.setMessage(10557, new Object[] {subtopicString}); - throw se; - } - } - - /* - * If there is a wildcard, we always need to match that subscription - * against the producer. If it has no wildcard, we can do a quick - * lookup to find the subscribers. - */ - if (subtopic.containsSubtopicWildcard()) - map = subscribersPerSubtopicWildcard; - else - map = subscribersPerSubtopic; - - synchronized (this) - { - topicSub = map.get(subtopic); - if (topicSub == null) - { - topicSub = new TopicSubscription(); - map.put(subtopic, topicSub); - } - } - } - - /* Subscribing with no selector */ - if (selector == null) - { - subs = topicSub.defaultSubscriptions; - if (subs == null) - { - synchronized (this) - { - if ((subs = topicSub.defaultSubscriptions) == null) - topicSub.defaultSubscriptions = subs = new ConcurrentHashMap<Object, MessageClient>(); - } - } - } - /* Subscribing with a selector - store all subscriptions under the selector key */ - else - { - synchronized (this) - { - if (topicSub.selectorSubscriptions == null) - topicSub.selectorSubscriptions = new ConcurrentHashMap<String, Map<Object, MessageClient>>(); - } - - subs = topicSub.selectorSubscriptions.get(selector); - if (subs == null) - { - synchronized (this) - { - if ((subs = topicSub.selectorSubscriptions.get(selector)) == null) - topicSub.selectorSubscriptions.put(selector, subs = new ConcurrentHashMap<Object, MessageClient>()); - } - } - } - - if (subs.containsKey(clientId)) - { - /* I'd rather this be an error but in 2.0 we allowed this without error */ - if (Log.isWarn()) - Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Client: " + clientId + " already subscribed to: " + destination.getId() + " selector: " + selector + " subtopic: " + subtopicString); - } - else - { - client.addSubscription(selector, subtopicString, maxFrequency); - synchronized (this) - { - /* - * Make sure other members of the cluster know that we are subscribed to - * this info if we are in server-to-server mode - * - * This has to be done in the synchronized section so that we properly - * order subscribe and unsubscribe messages for our peers so their - * subscription state matches the one in the local server. - */ - if (subs.isEmpty() && destination.isClustered() && - destination.getServerSettings().getRoutingMode() == RoutingMode.SERVER_TO_SERVER) - sendSubscriptionToPeer(true, selector, subtopicString); - subs.put(clientId, client); - } - monitorTimeout(client); // local operation, timeouts on remote host are not started until failover - - // Finally, if a new MessageClient was created, notify its created - // listeners now that MessageClient's subscription state is setup. - if (!subscriptionAlreadyExists) - client.notifyCreatedListeners(); - } - } - finally - { - releaseMessageClient(client); - } - - } - - /** - * Remove a subscriber. - * - * @param clientId the client id - * @param selector the selector - * @param subtopicString the subtopic - * @param endpointId the endpoint - */ - public void removeSubscriber(Object clientId, String selector, String subtopicString, String endpointId) - { - MessageClient client = null; - try - { - synchronized (allSubscriptionsLock) - { - // Do a simple lookup first to avoid the creation of a new MessageClient instance - // in the following call to getMessageClient() if the subscription is already removed. - client = allSubscriptions.get(clientId); - if (client == null) // Subscription was already removed. - return; - - // Re-get in order to track refs correctly. - client = getMessageClient(clientId, endpointId); - } - - Subtopic subtopic = getSubtopic(subtopicString); - TopicSubscription topicSub; - Map<Object, MessageClient> subs; - Map<Subtopic, TopicSubscription> map = null; - - if (subtopic == null) - { - topicSub = globalSubscribers; - } - else - { - if (subtopic.containsSubtopicWildcard()) - map = subscribersPerSubtopicWildcard; - else - map = subscribersPerSubtopic; - - topicSub = map.get(subtopic); - - if (topicSub == null) - throw new MessageException("Client: " + clientId + " not subscribed to subtopic: " + subtopic); - } - - if (selector == null) - subs = topicSub.defaultSubscriptions; - else - subs = topicSub.selectorSubscriptions.get(selector); - - if (subs == null || subs.get(clientId) == null) - throw new MessageException("Client: " + clientId + " not subscribed to destination with selector: " + selector); - - synchronized (this) - { - subs.remove(clientId); - if (subs.isEmpty() && - destination.isClustered() && destination.getServerSettings().getRoutingMode() == RoutingMode.SERVER_TO_SERVER) - sendSubscriptionToPeer(false, selector, subtopicString); - - if (subs.isEmpty()) - { - if (selector != null) - { - if (topicSub.selectorSubscriptions != null && !topicSub.selectorSubscriptions.isEmpty()) - topicSub.selectorSubscriptions.remove(selector); - } - - if (subtopic != null && - (topicSub.selectorSubscriptions == null || topicSub.selectorSubscriptions.isEmpty()) && - (topicSub.defaultSubscriptions == null || topicSub.defaultSubscriptions.isEmpty())) - { - if ((topicSub.selectorSubscriptions == null || topicSub.selectorSubscriptions.isEmpty()) && - (topicSub.defaultSubscriptions == null || topicSub.defaultSubscriptions.isEmpty())) - map.remove(subtopic); - } - } - } - - if (client.removeSubscription(selector, subtopicString)) - { - allSubscriptions.remove(clientId); - client.invalidate(); // Destroy the MessageClient. - } - } - finally - { - if (client != null) - releaseMessageClient(client); - } - } - - /** - * Create a new MessageClient object. - * - * @param clientId the client id - * @param endpointId the endpoint - * @return constructed MessageClient - */ - protected MessageClient newMessageClient(Object clientId, String endpointId) - { - return new MessageClient(clientId, destination, endpointId, true); - } - - /** - * This method is used for subscribers who maintain client ids in their - * own subscription tables. It ensures we have the MessageClient for - * a given clientId for as long as this session is valid (or the - * subscription times out). - * - * @param clientId the client id - * @param endpointId the endpoint - * @return registered MessageClient - */ - public MessageClient registerMessageClient(Object clientId, String endpointId) - { - MessageClient client = getMessageClient(clientId, endpointId); - - monitorTimeout(client); - - /* - * There is only one reference to the MessageClient for the - * registered flag. If someone happens to register the - * same client more than once, just allow that to add one reference. - */ - if (client.isRegistered()) - releaseMessageClient(client); - else - client.setRegistered(true); - - return client; - } - - /** - * Return a message client, creating it if needed. - * - * @param clientId the client if - * @param endpointId the endpoint - * @return the MessageClient - */ - public MessageClient getMessageClient(Object clientId, String endpointId) - { - synchronized (allSubscriptionsLock) - { - MessageClient client = allSubscriptions.get(clientId); - if (client == null) - { - client = newMessageClient(clientId, endpointId); - allSubscriptions.put(clientId, client); - } - - client.incrementReferences(); - return client; - } - } - - /** - * Release a client. - * - * @param client the client to release - */ - public void releaseMessageClient(MessageClient client) - { - if (client == null) - return; - - synchronized (allSubscriptionsLock) - { - if (client.decrementReferences()) - { - allSubscriptions.remove(client.getClientId()); - client.invalidate(); // Destroy the MessageClient. - } - } - } - - /** - * Turn on timeout for the provided client. - * - * @param client the client - */ - protected void monitorTimeout(MessageClient client) - { - if (subscriberSessionManager != null) - { - synchronized (client) - { - if (!client.isTimingOut()) - { - subscriberSessionManager.scheduleTimeout(client); - client.setTimingOut(true); - } - } - } - } - - private Subtopic getSubtopic(String subtopic) - { - return subtopic != null? - new Subtopic(subtopic, destination.getServerSettings().getSubtopicSeparator()) : null; - } - - /** - * Broadcast this subscribe/unsubscribe message to the cluster so everyone is aware - * of this server's interest in messages matching this selector and subtopic. - * - * @param subscribe are we subscribing? - * @param selector the selector - * @param subtopic the subtopic - */ - protected void sendSubscriptionToPeer(boolean subscribe, String selector, String subtopic) - { - if (Log.isDebug()) - Log.getLogger(MessageService.LOG_CATEGORY).debug("Sending subscription to peers for subscribe? " + subscribe + " selector: " + selector + " subtopic: " + subtopic); - - ((MessageService)destination.getService()).sendSubscribeFromPeer(destination.getId(), subscribe, selector, subtopic); - } - - static class TopicSubscription - { - /** This is the Map of clientId to MessageClient for each client subscribed to this topic with no selector. */ - Map<Object, MessageClient> defaultSubscriptions; - - /** A map of selector string to Map of clientId to MessageClient. */ - Map<String, Map<Object, MessageClient>> selectorSubscriptions; - - @Override public String toString() - { - StringBuffer sb = new StringBuffer(100); - - sb.append("default subscriptions: ").append(defaultSubscriptions).append(StringUtils.NEWLINE); - sb.append("selector subscriptions: ").append(selectorSubscriptions).append(StringUtils.NEWLINE); - return sb.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/Subtopic.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/Subtopic.java b/core/src/flex/messaging/services/messaging/Subtopic.java deleted file mode 100644 index ba641b0..0000000 --- a/core/src/flex/messaging/services/messaging/Subtopic.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging; - -import flex.messaging.services.ServiceException; - -import java.util.StringTokenizer; - -/** - * Represents a message destination subtopic. You are given instances of Subtopics - * as arguments to the MessagingAdapter.allowSubscribe and allowSend methods. These - * are used to implement your own authorization for the subscribe and send messages - * to specific subtopics. - */ -public class Subtopic -{ - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructor. - * - * - * - * @param subtopic The full subtopic string. - * @param separator The separator for tokenizing a hierarchical subtopic. - */ - public Subtopic(String subtopic, String separator) - { - this.subtopic = subtopic; - this.separator = separator; - - // Subtopic cannot be zero length. - if (subtopic.length() == 0) - { - ServiceException se = new ServiceException(); - se.setMessage(10554, new Object[] {subtopic}); - throw se; - } - // Validate subtopic format if it contains a separator. - else if ((separator != null) && (subtopic.indexOf(separator) != -1)) - { - hierarchical = true; - /* - * Each token must have non-zero length, meaning no leading or trailing - * separator or empty subtopics in between. - */ - if (subtopic.startsWith(separator) || - subtopic.endsWith(separator) || - (subtopic.indexOf(separator + separator) != -1)) - { - ServiceException se = new ServiceException(); - se.setMessage(10554, new Object[] {subtopic}); - throw se; - } - - /* - * If a token contains the SUBTOPIC_WILDCARD, that token may not - * contain any additional characters. - * I.e. chat.* is OK, chat.f* is not OK (assuming a separator of '.'). - */ - - StringTokenizer tokenizer = new StringTokenizer(subtopic, separator); - while (tokenizer.hasMoreTokens()) - { - String token = tokenizer.nextToken(); - if (token.indexOf(SUBTOPIC_WILDCARD) != -1) - { - if (!token.equals(SUBTOPIC_WILDCARD)) - { - ServiceException se = new ServiceException(); - se.setMessage(10554, new Object[] {subtopic}); - throw se; - } - else - { - hasSubtopicWildcard = true; - } - } - } - } - // Non-hierarchical subtopics cannot contain subtopic wildcard unless - // that is the only value, "*", indicating a match for any subtopic. - else if (subtopic.indexOf(SUBTOPIC_WILDCARD) != -1) - { - if (!subtopic.equals(SUBTOPIC_WILDCARD)) - { - ServiceException se = new ServiceException(); - se.setMessage(10554, new Object[] {subtopic}); - throw se; - } - else - { - hasSubtopicWildcard = true; - } - } - } - - //-------------------------------------------------------------------------- - // - // Constants - // - //-------------------------------------------------------------------------- - - /** - * The wildcard token for hierarchical subtopics. - */ - public static final String SUBTOPIC_WILDCARD = "*"; - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * The full subtopic value. - */ - private String subtopic; - - /** - * The separator used if the subtopic is hierarchical. - */ - private String separator; - - /** - * Flag to store whether the subtopic is hierarchical. - */ - private boolean hierarchical; - - /** - * Flag to store whether the subtopic contains subtopic wildcards. - */ - private boolean hasSubtopicWildcard; - - //-------------------------------------------------------------------------- - // - // Methods - // - //-------------------------------------------------------------------------- - - /** - * Returns true if the subtopic contains a hierarchical subtopic wildcard. - * - * @return true if the subtopic contains a hierarchical subtopic wildcard, - * otherwise false. - */ - public boolean containsSubtopicWildcard() - { - return hasSubtopicWildcard; - } - - /** - * Override of equals. - * - * @param other The object to compare against. - * @return <code>true</code> if other equals to this; <code>false</code> otherwise; - */ - public boolean equals(Object other) - { - if (!(other instanceof Subtopic)) - return false; - Subtopic otherSubtopic = (Subtopic) other; - return subtopic.equals(otherSubtopic.subtopic) && - ((separator == null && otherSubtopic.separator == null) || (separator != null && separator.equals(otherSubtopic.separator))); - } - - /** - * Returns the separator used to create this Subtopic instance. - * This value may be <code>null</code>. - * - * @return The separator used to create this Subtopic instance. - */ - public String getSeparator() - { - return separator; - } - - /** - * Returns the subtopic value used to create this Subtopic instance. - * - * @return The subtopic value used to create this Subtopic instance. - */ - public String getValue() - { - return subtopic; - } - - /** - * Override of hashCode. Hash using the subtopic String rather than the object's address. - * - * @return The hashCode. - */ - public int hashCode() - { - return subtopic.hashCode(); - } - - /** - * Returns true is the subtopic is hierarchical. - * - * @return true if the subtopic is hierarchical, otherwise false. - */ - public boolean isHierarchical() - { - return hierarchical; - } - - /** - * Matches the passed subtopic against this subtopic. - * If neither subtopic contains a wildcard they must literally match. - * If one or the other contains a wildcard they may match. - * "chatrooms.*" will match "chatrooms.lobby" or "chatrooms.us.ca" but will - * not match "chatrooms" (assuming a subtopic separator of "."). - * "chatrooms.*.ca" will match "chatrooms.us.ca" but not "chatrooms.us.ma". - * - * @param other The other subtopic to match against this subtopic. - * @return true if they match, otherwise false. - */ - public boolean matches(Subtopic other) - { - // If neither contain a wildcard, match them as simple Strings. - if (!hasSubtopicWildcard && !other.hasSubtopicWildcard) - { - return (subtopic.equals(other.subtopic)) ? true : false; - } - // Otherwise, this subtopic or the other contains a wildcard. - else - { - // If both are hierarchical but use different separators - don't match. - if (hierarchical && other.hierarchical && !separator.equals(other.separator)) - return false; - - StringTokenizer t1 = new StringTokenizer(subtopic, separator); - StringTokenizer t2 = new StringTokenizer(other.subtopic, other.separator); - int n = t1.countTokens(); - int difference = n - t2.countTokens(); - - String tok1 = null; - String tok2 = null; - boolean matchToken; - while (n-- > 0) - { - tok1 = t1.nextToken(); - matchToken = !tok1.equals(SUBTOPIC_WILDCARD); - - if (t2.hasMoreTokens()) - { - tok2 = t2.nextToken(); - if (tok2.equals(SUBTOPIC_WILDCARD)) - continue; - } - else - { - break; // No more tokens to compare. - } - - if (matchToken && !tok1.equals(tok2)) - return false; - } - - if (difference == 0) - return true; - else if ((difference < 0) && tok1.equals(SUBTOPIC_WILDCARD)) - return true; - else return (difference > 0) && tok2.equals(SUBTOPIC_WILDCARD); - } - } - - /** - * Override of toString. - * - * @return The subtopic string. - */ - public String toString() - { - return subtopic; - } - -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/ThrottleManager.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/ThrottleManager.java b/core/src/flex/messaging/services/messaging/ThrottleManager.java deleted file mode 100644 index 499c5f0..0000000 --- a/core/src/flex/messaging/services/messaging/ThrottleManager.java +++ /dev/null @@ -1,556 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging; - -import java.util.HashMap; -import java.util.Map; - -import flex.management.ManageableComponent; -import flex.management.runtime.messaging.services.messaging.ThrottleManagerControl; -import flex.messaging.MessageException; -import flex.messaging.config.ConfigurationException; -import flex.messaging.config.ThrottleSettings; -import flex.messaging.config.ThrottleSettings.Policy; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.messages.Message; -import flex.messaging.services.messaging.ThrottleManager.ThrottleResult.Result; - -/** - * - * - * The ThrottleManager provides functionality to limit the frequency of messages - * routed through the system in message/second terms. Message frequency can be managed - * on a per-client basis and also on a per-destination basis by tweaking different - * parameters. Each MessageDestination has one ThrottleManager. - * - * Message frequency can be throttled differently for incoming messages, which are messages - * published by Flash/Flex producers, and for outgoing messages, which are messages - * consumed by Flash/Flex subscribers that may have been produced by either Flash clients - * or external message producers (such as data feeds, JMS publishers, etc). - * - */ -public class ThrottleManager extends ManageableComponent -{ - //-------------------------------------------------------------------------- - // - // Public Static Constants - // - //-------------------------------------------------------------------------- - - public static final String LOG_CATEGORY = LogCategories.TRANSPORT_THROTTLE; - public static final String TYPE = "ThrottleManager"; - - //-------------------------------------------------------------------------- - // - // Private Static Constants - // - //-------------------------------------------------------------------------- - - private static final Object classMutex = new Object(); - - //-------------------------------------------------------------------------- - // - // Private Static Variables - // - //-------------------------------------------------------------------------- - - private static int instanceCount = 0; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an unmanaged <code>ThrottleManager</code> instance. - */ - public ThrottleManager() - { - this(false); - } - - /** - * Constructs a <code>ThrottleManager</code> with the indicated management. - * - * @param enableManagement <code>true</code> if the <code>ThrottleManager</code> - * is manageable; otherwise <code>false</code>. - */ - public ThrottleManager(boolean enableManagement) - { - super(enableManagement); - synchronized (classMutex) - { - super.setId(TYPE + ++instanceCount); - } - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - protected ThrottleSettings settings; - private Map<String, MessageFrequency> inboundClientMarks; - private MessageFrequency inboundDestinationMark; - private MessageFrequency outboundDestinationMark; - - //-------------------------------------------------------------------------- - // - // Initialize, validate, start, and stop methods. - // - //-------------------------------------------------------------------------- - - /** - * Starts the throttle manager. - */ - @Override - public void start() - { - // Use the default ThrottleSettings if one is not set already. - if (settings == null) - settings = new ThrottleSettings(); - - if (settings.isDestinationThrottleEnabled()) - { - inboundDestinationMark = new MessageFrequency(settings.getIncomingDestinationFrequency()); - outboundDestinationMark = new MessageFrequency(settings.getOutgoingDestinationFrequency()); - } - - if (settings.isInboundClientThrottleEnabled()) - inboundClientMarks = new HashMap<String, MessageFrequency>(); - } - - - /** - * Stops the throttle manager. - */ - @Override - public void stop() - { - super.stop(); - - // Remove management. - if (isManaged() && getControl() != null) - { - getControl().unregister(); - setControl(null); - setManaged(false); - } - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Given a policy, returns the result for that policy. - * - * @param policy The policy. - * @return The result for the policy. - */ - public static Result getResult(Policy policy) - { - if (Policy.IGNORE == policy) - return Result.IGNORE; - else if (Policy.ERROR == policy) - return Result.ERROR; - else if (Policy.BUFFER == policy) - return Result.BUFFER; - else if (Policy.CONFLATE == policy) - return Result.CONFLATE; - return Result.OK; - } - - /** - * Returns the outbound policy being used by the throttle manager. - * - * @return The outbound policy for the throttle manager. - */ - public Policy getOutboundPolicy() - { - return settings == null? null : settings.getOutboundPolicy(); - } - - - /** - * This is a no-op because throttle manager's id is generated internally. - * - * @param id The id. - */ - @Override - public void setId(String id) - { - // No-op - } - - /** - * - * Used by the MessageClient in its cleanup process. - * - * @param clientId The id of the MessageClient. - */ - public void removeClientThrottleMark(Object clientId) - { - if (inboundClientMarks != null) - inboundClientMarks.remove(clientId); - // Note that the outBoundClientMarks that is maintained by the FlexClientOutboundQueueProcessor - // is cleaned up by FlexClient when MessageClient is unregistered with it. - } - - /** - * Sets the throttling settings of the throttle manager. - * - * @param throttleSettings The throttling settings for the throttle manager. - */ - public void setThrottleSettings(ThrottleSettings throttleSettings) - { - // Make sure that we have valid outbound policies. - Policy outPolicy = throttleSettings.getOutboundPolicy(); - if (outPolicy != Policy.NONE && outPolicy != Policy.IGNORE) - { - ConfigurationException ex = new ConfigurationException(); - ex.setMessage("Invalid outbound throttle policy '" + outPolicy - + "' for destination '" + throttleSettings.getDestinationName() - + "'. Valid values are 'NONE' and 'IGNORE'."); - throw ex; - } - settings = throttleSettings; - } - - /** - * Attempts to throttle the incoming message at the destination and the client level. - * - * @param message Message to be throttled. - * @return True if the message was throttled; otherwise false. - */ - public boolean throttleIncomingMessage(Message message) - { - // destination-level throttling comes before client-level, because if it - // fails then it doesn't matter what the client-level throttle reports. - ThrottleResult throttleResult = throttleDestinationLevel(message, true); - if (throttleResult.getResult() == Result.OK) - { - // client-level throttling allows the system to further refine a - // different throttle for individual clients, which may be a subset - // but never a superset of destination-level throttle settings - throttleResult = throttleIncomingClientLevel(message); - handleIncomingThrottleResult(message, throttleResult, true /*isClientLevel*/); - boolean throttled = throttleResult.getResult() != Result.OK; - if (!throttled) - { - updateMessageFrequencyDestinationLevel(true /* incoming */); - updateMessageFrequencyIncomingClientLevel(message); - } - return throttled; - } - - handleIncomingThrottleResult(message, throttleResult, false /*isClientLevel*/); - boolean throttled = throttleResult.getResult() != Result.OK; - if (!throttled) - { - updateMessageFrequencyDestinationLevel(true /* incoming */); - updateMessageFrequencyIncomingClientLevel(message); - } - return throttled; - } - - /** - * Attempts to throttle the outgoing message at the destination level only. - * Client level throttling is enforced at FlexClientOutboundQueueProcessor. - * - * @param message The message to be throttled. - * @return The result of throttling attempt. - */ - public ThrottleResult throttleOutgoingMessage(Message message) - { - ThrottleResult throttleResult = throttleDestinationLevel(message, false); - // Outbound client-level throttling happens in FlexClientOutboundQueueProcessor. - handleOutgoingThrottleResult(message, throttleResult, false /*isClientLevel*/); - return throttleResult; - } - - /** - * A utility method to handle outgoing throttling results in a common way. - * - * @param message The message that is being throttled. - * @param throttleResult The throttling result. - * @param isClientLevel Whether the message is being throttled at the client level - * or not. - */ - public void handleOutgoingThrottleResult(Message message, ThrottleResult throttleResult, boolean isClientLevel) - { - Result result = throttleResult.getResult(); - - // Update the management metrics. - if (result != Result.OK && isManaged()) - { - if (isClientLevel) - ((ThrottleManagerControl)getControl()).incrementClientOutgoingMessageThrottleCount(); - else - ((ThrottleManagerControl)getControl()).incrementDestinationOutgoingMessageThrottleCount(); - } - - // Result can only be IGNORE (or NONE which means no throttling) - if (result == Result.IGNORE) - { - // Improve the detail message for IGNORE. - if (isClientLevel) - throttleResult.setDetail("Message '" + message.getMessageId() + "' ignored: Too many messages sent to client '" - + message.getClientId() + "' in too small of a time interval " + throttleResult.getDetail()); - else - throttleResult.setDetail("Message '" + message.getMessageId() + "' throttled: Too many messages routed by destination '" - + message.getDestination() + "' in too small of a time interval " + throttleResult.getDetail()); - - if (Log.isInfo()) - Log.getLogger(LOG_CATEGORY).info(throttleResult.getDetail()); - } - } - - /** - * Attempts to throttle destination-level incoming and outgoing messages. - * - * @param message Message to throttle. - * @param incoming Whether the message is incoming or outgoing. - * @return The result of the throttling attempt. - */ - public ThrottleResult throttleDestinationLevel(Message message, boolean incoming) - { - if (incoming && settings.isInboundDestinationThrottleEnabled()) - { - ThrottleResult result = inboundDestinationMark.checkLimit(settings.getIncomingDestinationFrequency(), settings.getInboundPolicy()); - return result; - } - else if (!incoming && settings.isOutboundDestinationThrottleEnabled()) - { - ThrottleResult result = outboundDestinationMark.checkLimit(settings.getOutgoingDestinationFrequency(), settings.getOutboundPolicy()); - return result; - } - // Return the default OK result. - return new ThrottleResult(); - } - - /** - * Updates the destination level message frequency. - * - * param incoming Whether the message is incoming or outgoing. - */ - public void updateMessageFrequencyDestinationLevel(boolean incoming) - { - if (incoming && settings.isInboundDestinationThrottleEnabled()) - inboundDestinationMark.updateMessageFrequency(); - else if (!incoming && settings.isOutboundDestinationThrottleEnabled()) - outboundDestinationMark.updateMessageFrequency(); - } - - /** - * Updates the incoming client level message frequency. - */ - public void updateMessageFrequencyIncomingClientLevel(Message message) - { - String clientId = (String)message.getClientId(); - if (settings.isInboundClientThrottleEnabled()) - { - MessageFrequency clientLevelMark = inboundClientMarks.get(clientId); - if (clientLevelMark != null) - clientLevelMark.updateMessageFrequency(); - } - } - - //-------------------------------------------------------------------------- - // - // Protected and private methods. - // - //-------------------------------------------------------------------------- - - /** - * Returns the log category for the throttle manager. - */ - @Override - protected String getLogCategory() - { - return LOG_CATEGORY; - } - - /** - * A utility method to handle incoming throttling results in a common way. - * - * @param message The message that is being throttled. - * @param throttleResult The throttling result. - * @param isClientLevel Whether the message is being throttled at the client level - * or not. - */ - protected void handleIncomingThrottleResult(Message message, ThrottleResult throttleResult, boolean isClientLevel) - { - Result result = throttleResult.getResult(); - - // Update the management metrics. - if (result != Result.OK && isManaged()) - { - if (isClientLevel) - ((ThrottleManagerControl)getControl()).incrementClientIncomingMessageThrottleCount(); - else - ((ThrottleManagerControl)getControl()).incrementDestinationIncomingMessageThrottleCount(); - } - - // Result can be IGNORE or ERROR (or NONE which means no throttling). - if (result == Result.IGNORE || result == Result.ERROR) - { - if (isClientLevel) - throttleResult.setDetail("Message '" + message.getMessageId() + "' throttled: Too many messages sent by the client '" - + message.getClientId() + "' in too small of a time interval " + throttleResult.getDetail()); - else - throttleResult.setDetail("Message '" + message.getMessageId() + "' throttled: Too many messages sent to destination '" - + message.getDestination() + "' in too small of a time interval " + throttleResult.getDetail()); - - String detail = throttleResult.getDetail(); - if (result == Result.ERROR) - { - if (Log.isError()) - Log.getLogger(LOG_CATEGORY).error(detail); - // And, throw an exception, so the client gets the error. - MessageException me = new MessageException(detail); - throw me; - } - // Otherwise, just log it. - if (Log.isInfo()) - Log.getLogger(LOG_CATEGORY).info(detail); - } - } - - /** - * Attempts to throttle client-level incoming messages only. Client-level - * outgoing messages are throttled at the FlexClientOutboundQueueProcessor. - * - * @param message Message to throttle. - * @return The result of the throttling attempt. - */ - protected ThrottleResult throttleIncomingClientLevel(Message message) - { - String clientId = (String)message.getClientId(); - if (settings.isInboundClientThrottleEnabled()) - { - MessageFrequency clientLevelMark; - clientLevelMark = inboundClientMarks.get(clientId); - if (clientLevelMark == null) - clientLevelMark = new MessageFrequency(settings.getIncomingClientFrequency()); - - ThrottleResult result = clientLevelMark.checkLimit(settings.getIncomingClientFrequency(), settings.getInboundPolicy()); - inboundClientMarks.put(clientId, clientLevelMark); - return result; - } - // Return the default OK result. - return new ThrottleResult(); - } - - //-------------------------------------------------------------------------- - // - // Nested Classes - // - //-------------------------------------------------------------------------- - - /** - * This class is used to keep track of throttling results. - */ - public static class ThrottleResult - { - /** - * Result enum. - */ - public enum Result - { - OK, IGNORE, ERROR, BUFFER, CONFLATE - } - - private String detail; - private Result result; - - /** - * Creates a ThrottleResult with Result.OK. - */ - public ThrottleResult() - { - this(Result.OK); - } - - /** - * Creates a ThrottleResult with the passed in Result. - * - * @param result The Result. - */ - public ThrottleResult(Result result) // FIXME - { - this.result = result; - } - - /** - * Creates a ThrottleResult with the passed in Result and detail. - * - * @param result The Result. - * @param detail The detail. - */ - public ThrottleResult(Result result, String detail) // FIXME - { - this(result); - this.detail = detail; - } - - /** - * Returns the detail. - * - * @return The detail. - */ - public String getDetail() - { - return detail; - } - - /** - * Sets the detail. - * - * @param detail The detail. - */ - public void setDetail(String detail) - { - this.detail = detail; - } - - /** - * Returns the result. - * - * @return The result. - */ - public Result getResult() - { - return result; - } - - /** - * Sets the result. - * - * @param result The result. - */ - public void setResult(Result result) - { - this.result = result; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/ActionScriptAdapter.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/ActionScriptAdapter.java b/core/src/flex/messaging/services/messaging/adapters/ActionScriptAdapter.java deleted file mode 100644 index fb00d0c..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/ActionScriptAdapter.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import java.util.Set; - -import flex.messaging.services.MessageService; -import flex.messaging.services.messaging.SubscriptionManager; -import flex.messaging.messages.Message; -import flex.messaging.Destination; -import flex.messaging.MessageDestination; -import flex.management.runtime.messaging.services.messaging.adapters.ActionScriptAdapterControl; - -/** - * An ActionScript object based adapter for the MessageService - * that supports simple publish/subscribe messaging between - * ActionScript based clients. - */ -public class ActionScriptAdapter extends MessagingAdapter -{ - private ActionScriptAdapterControl controller; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs a default <code>ActionScriptAdapter</code>. - */ - public ActionScriptAdapter() - { - super(); - } - - //-------------------------------------------------------------------------- - // - // Public Getters and Setters for ServiceAdapter properties - // - //-------------------------------------------------------------------------- - - /** - * Casts the <code>Destination</code> into <code>MessageDestination</code> - * and calls super.setDestination. - * - * @param destination - */ - public void setDestination(Destination destination) - { - Destination dest = (MessageDestination)destination; - super.setDestination(dest); - } - - //-------------------------------------------------------------------------- - // - // Other Public APIs - // - //-------------------------------------------------------------------------- - - /** - * Handle a data message intended for this adapter. - */ - public Object invoke(Message message) - { - MessageDestination destination = (MessageDestination)getDestination(); - MessageService msgService = (MessageService)destination.getService(); - - SubscriptionManager subscriptionManager = destination.getSubscriptionManager(); - Set subscriberIds = subscriptionManager.getSubscriberIds(message, true /*evalSelector*/); - if (subscriberIds != null && !subscriberIds.isEmpty()) - { - /* We have already filtered based on the selector and so pass false below */ - msgService.pushMessageToClients(destination, subscriberIds, message, false); - } - msgService.sendPushMessageFromPeer(message, destination, true); - - return null; - } - - /** - * Invoked automatically to allow the <code>ActionScriptAdapter</code> to setup its corresponding - * MBean control. - * - * @param broker The <code>Destination</code> that manages this <code>ActionScriptAdapter</code>. - */ - protected void setupAdapterControl(Destination destination) - { - controller = new ActionScriptAdapterControl(this, destination.getControl()); - controller.register(); - setControl(controller); - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/AsyncMessageReceiver.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/services/messaging/adapters/AsyncMessageReceiver.java b/core/src/flex/messaging/services/messaging/adapters/AsyncMessageReceiver.java deleted file mode 100644 index fb1c0f4..0000000 --- a/core/src/flex/messaging/services/messaging/adapters/AsyncMessageReceiver.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.services.messaging.adapters; - -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; - -/** - * A <code>MessageReceiver</code> that receives messages asynchronously from JMS. - * - * - */ -class AsyncMessageReceiver implements MessageReceiver, ExceptionListener, MessageListener -{ - private JMSConsumer jmsConsumer; - - /** - * Constructs a new AsyncMessageReceiver. - * - * @param jmsConsumer JMSConsumer associated with the AsyncMessageReceiver. - */ - public AsyncMessageReceiver(JMSConsumer jmsConsumer) - { - this.jmsConsumer = jmsConsumer; - } - - /** - * Implements MessageReceiver.startReceive. - */ - public void startReceive() throws JMSException - { - jmsConsumer.setMessageListener(this); - } - - /** - * Implements MessageReceiver.stopReceive. - */ - public void stopReceive() - { - // Nothing to do. - } - - /** - * Implements javax.jms.ExceptionListener.onException. - * - * @param exception JMS exception received from the JMS server. - */ - public void onException(JMSException exception) - { - jmsConsumer.onException(exception); - } - - /** - * Implements javax.jms.MessageListener.onMessage. - * - * @param message JMS message received from the JMS server. - */ - public void onMessage(Message message) - { - jmsConsumer.onMessage(message); - } -}
