http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlexClientBindingEvent.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientBindingEvent.java b/core/src/flex/messaging/client/FlexClientBindingEvent.java deleted file mode 100644 index 3fa20de..0000000 --- a/core/src/flex/messaging/client/FlexClientBindingEvent.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -/** - * Event used to notify FlexClientAttributeListeners of changes to FlexClient - * attributes. - */ -public class FlexClientBindingEvent -{ - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs an event for an attribute that is bound or unbound from a FlexClient. - * - * @param client The FlexClient. - * @param name The attribute name. - */ - public FlexClientBindingEvent(FlexClient client, String name) - { - this.client = client; - this.name = name; - } - - - /** - * Constructs an event for an attribute that is added to a FlexClient or - * replaced by a new value. - * - * @param client The FlexClient. - * @param name The attribute name. - * @param value The attribute value. - */ - public FlexClientBindingEvent(FlexClient client, String name, Object value) - { - this.client = client; - this.name = name; - this.value = value; - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * The FlexClient that generated the event. - */ - private FlexClient client; - - /** - * The name of the attribute associated with the event. - */ - private String name; - - /** - * The value of the attribute associated with the event. - */ - private Object value; - - //-------------------------------------------------------------------------- - // - // Methods - // - //-------------------------------------------------------------------------- - - /** - * Returns the FlexClient that generated the event. - * - * @return The FlexClient that generated the event. - */ - public FlexClient getClient() - { - return client; - } - - /** - * Returns the name of the attribute associated with the event. - * - * @return The name of the attribute associated with the event. - */ - public String getName() - { - return name; - } - - /** - * Returns the value of the attribute associated with the event. - * - * @return The value of the attribute associated with the event. - */ - public Object getValue() - { - return value; - } -}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlexClientBindingListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientBindingListener.java b/core/src/flex/messaging/client/FlexClientBindingListener.java deleted file mode 100644 index 50c319f..0000000 --- a/core/src/flex/messaging/client/FlexClientBindingListener.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -/** - * Interface to be notified when the implementing object is bound or unbound from the FlexClient. - */ -public interface FlexClientBindingListener -{ - /** - * Callback invoked when the object is bound to a FlexClient. - * - * @param event The event containing the FlexClient and attribute - * information. - */ - void valueBound(FlexClientBindingEvent event); - - /** - * Callback invoked when the object is unbound from a FlexClient. - * - * @param event The event containing the FlexClient and attribute - * information. - */ - void valueUnbound(FlexClientBindingEvent event); -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlexClientListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientListener.java b/core/src/flex/messaging/client/FlexClientListener.java deleted file mode 100644 index 7c8cd3d..0000000 --- a/core/src/flex/messaging/client/FlexClientListener.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -/** - * Interface to be notified when a FlexClient is created or destroyed. Implementations of this interface - * may add themselves as created listeners statically via <code>FlexClient.addClientCreatedListener()</code>. - * To listen for FlexClient destruction, the implementation instance must add itself as a listener to - * a specific FlexClient instance via the <code>addClientDestroyedListener()</code> method. - */ -public interface FlexClientListener -{ - /** - * Notification that a FlexClient was created. - * - * @param client The FlexClient that was created. - */ - void clientCreated(FlexClient client); - - /** - * Notification that a FlexClient is about to be destroyed. - * - * @param client The FlexClient that will be destroyed. - */ - void clientDestroyed(FlexClient client); -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlexClientManager.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientManager.java b/core/src/flex/messaging/client/FlexClientManager.java deleted file mode 100644 index 943956d..0000000 --- a/core/src/flex/messaging/client/FlexClientManager.java +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadFactory; - -import flex.management.ManageableComponent; -import flex.management.runtime.messaging.client.FlexClientManagerControl; -import flex.messaging.FlexContext; -import flex.messaging.MessageBroker; -import flex.messaging.MessageException; -import flex.messaging.config.FlexClientSettings; -import flex.messaging.endpoints.AbstractEndpoint; -import flex.messaging.endpoints.Endpoint; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.util.ClassUtil; -import flex.messaging.util.TimeoutAbstractObject; -import flex.messaging.util.TimeoutManager; - -/** - * - * Manages FlexClient instances for a MessageBroker. - */ -public class FlexClientManager extends ManageableComponent -{ - public static final String TYPE = "FlexClientManager"; - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * - */ - public FlexClientManager() - { - this(MessageBroker.getMessageBroker(null)); - } - /** - * Constructs a FlexClientManager for the passed MessageBroker. - * - * @param broker The MessageBroker that the Flex client manager is associated with. - */ - public FlexClientManager(MessageBroker broker) - { - this(broker.isManaged(), broker); - } - - /** - * - */ - public FlexClientManager(boolean enableManagement, MessageBroker mbroker) - { - super(enableManagement); - - super.setId(TYPE); - - // Ensure that we have a message broker: - broker = (mbroker != null) ? mbroker : MessageBroker.getMessageBroker(null); - - FlexClientSettings flexClientSettings = broker.getFlexClientSettings(); - if (flexClientSettings != null && flexClientSettings.getTimeoutMinutes() != -1) - { - // Convert from minutes to millis. - setFlexClientTimeoutMillis(flexClientSettings.getTimeoutMinutes()*60*1000); - } - - this.setParent(broker); - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * The MessageBroker that owns this manager. - */ - private final MessageBroker broker; - - /** - * The Mbean controller for this manager. - */ - private FlexClientManagerControl controller; - - /** - * Table to store FlexClients by id. - */ - private final Map<String,FlexClient> flexClients = new ConcurrentHashMap<String,FlexClient>(); - - - /** - * Manages time outs for FlexClients. - * This currently includes timeout of FlexClient instances, timeouts for async - * long-poll handling, and scheduling delayed flushes of outbound messages. - */ - private volatile TimeoutManager flexClientTimeoutManager; - - //-------------------------------------------------------------------------- - // - // Properties - // - //-------------------------------------------------------------------------- - - //---------------------------------- - // clientIds - //---------------------------------- - - /** - * Returns a string array of the client IDs. - * - * @return A string array of the client IDs. - */ - public String[] getClientIds() - { - String[] ids = new String[flexClients.size()]; - ArrayList<String> idList = new ArrayList<String>(flexClients.keySet()); - - for (int i = 0; i < flexClients.size(); i++) - ids[i] = idList.get(i); - - return ids; - } - - //---------------------------------- - // flexClientCount - //---------------------------------- - - /** - * Returns the number of FlexClients in use. - * - * @return The number of FlexClients in use. - */ - public int getFlexClientCount() - { - return flexClients.size(); - } - - //---------------------------------- - // flexClientTimeoutMillis - //---------------------------------- - - private volatile long flexClientTimeoutMillis; - - /** - * Returns the idle timeout in milliseconds to apply to new FlexClient instances. - * - * @return The idle timeout in milliseconds to apply to new FlexClient instances. - */ - public long getFlexClientTimeoutMillis() - { - return flexClientTimeoutMillis; - } - - /** - * Sets the idle timeout in milliseconds to apply to new FlexClient instances. - * - * @param value The idle timeout in milliseconds to apply to new FlexClient instances. - */ - public void setFlexClientTimeoutMillis(long value) - { - if (value < 1) - value = 0; - - synchronized (this) - { - flexClientTimeoutMillis = value; - } - } - - //---------------------------------- - // messageBroker - //---------------------------------- - - /** - * Returns the MessageBroker instance that owns this FlexClientManager. - * - * @return The parent MessageBroker instance. - */ - public MessageBroker getMessageBroker() - { - return broker; - } - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Get FlexClient with the specified id or a new one will be created. - * This method will return a valid existing FlexClient for the specific Id, - * or a new FlexClient will created - * @param id The id of the Flex client. - * @return FlexClient the FlexClient with the specified id - */ - public FlexClient getFlexClient(String id) - { - return getFlexClient(id, true); - } - - /** - * Get the FlexClient with the specified id. - * - * @param id The id of the Flex client. - * @param createNewIfNotExist if true, a new FlexClient will be created if not exist - * @return FlexClient the FlexClient with the specified id - */ - public FlexClient getFlexClient(String id, boolean createNewIfNotExist) - { - FlexClient flexClient = null; - // Try to lookup an existing instance if we receive an id. - if (id != null) - { - flexClient = flexClients.get(id); - if (flexClient != null) - { - if (flexClient.isValid() && !flexClient.invalidating) - { - flexClient.updateLastUse(); - return flexClient; - } - // Invalid, remove it - it will be replaced below. - flexClients.remove(id); - } - } - // Use a manager-level lock (this) when creating/recreating a new FlexClient. - synchronized (this) - { - if (id != null) - { - flexClient = flexClients.get(id); - if (flexClient != null) - { - flexClient.updateLastUse(); - return flexClient; - } - else - { - if (!createNewIfNotExist) - { - return null; - } - } - } - - flexClient = createFlexClient(id); - checkForNullAndDuplicateId(flexClient.getId()); - flexClients.put(flexClient.getId(), flexClient); - if (flexClientTimeoutMillis > 0) - flexClientTimeoutManager.scheduleTimeout(flexClient); - flexClient.notifyCreated(); - return flexClient; - } - } - - /** - * Creates a FlexClientOutboundQueueProcessor instance and hooks it up to the passed - * FlexClient. - * - * @param flexClient The FlexClient to equip with a queue processor. - * @param endpointId The Id of the endpoint the queue processor is used for. - * @return The FlexClient with a configured queue processor. - */ - public FlexClientOutboundQueueProcessor createOutboundQueueProcessor(FlexClient flexClient, String endpointId) - { - // First, try to create a custom outbound queue processor, if one exists. - FlexClientOutboundQueueProcessor processor = createCustomOutboundQueueProcessor(flexClient, endpointId); - - // If no custom processor, then try to create default queue processor. - if (processor == null) - processor = createDefaultOutboundQueueProcessor(flexClient, endpointId); - - // If MessageBroker's default queue processor fails, use the default processor. - if (processor == null) - { - processor = new FlexClientOutboundQueueProcessor(); - processor.setFlexClient(flexClient); - processor.setEndpointId(endpointId); - } - - return processor; - } - - /** - * - * Monitors an async poll for a FlexClient for timeout. - * - * @param asyncPollTimeout The async poll task to monitor for timeout. - */ - public void monitorAsyncPollTimeout(TimeoutAbstractObject asyncPollTimeout) - { - flexClientTimeoutManager.scheduleTimeout(asyncPollTimeout); - } - - /** - * - * Monitors a scheduled flush for a FlexClient for timeout. - * - * @param scheduledFlushTimeout The schedule flush task to monitor for timeout. - */ - public void monitorScheduledFlush(TimeoutAbstractObject scheduledFlushTimeout) - { - flexClientTimeoutManager.scheduleTimeout(scheduledFlushTimeout); - } - - /** - * Starts the Flex client manager. - * - * @see flex.management.ManageableComponent#start() - */ - @Override - public void start() - { - if (isManaged()) - { - controller = new FlexClientManagerControl(getParent().getControl(), this); - setControl(controller); - controller.register(); - } - - final String baseId = getId(); - flexClientTimeoutManager = new TimeoutManager(new ThreadFactory() - { - int counter = 1; - public synchronized Thread newThread(Runnable runnable) - { - Thread t = new Thread(runnable); - t.setName(baseId + "-FlexClientTimeoutThread-" + counter++); - return t; - } - }); - } - - /** - * @see flex.management.ManageableComponent#stop() - */ - public void stop() - { - if (controller != null) - { - controller.unregister(); - } - - if (flexClientTimeoutManager != null) - flexClientTimeoutManager.shutdown(); - } - - //-------------------------------------------------------------------------- - // - // Protected Methods - // - //-------------------------------------------------------------------------- - - /** - * Hook method invoked when a new <tt>FlexClient</tt> instance is created. - * - * @param id The id the client provided, which was previously assigned by this server, - * or another server in a cluster. New clients will pass a <code>null</code> - * value in which case this server must generate a unique id. - */ - protected FlexClient createFlexClient(String id) - { - return (id == null) ? new FlexClient(this) : new FlexClient(this, id); - } - - /* (non-Javadoc) - * @see flex.management.ManageableComponent#getLogCategory() - */ - protected String getLogCategory() - { - return LogCategories.CLIENT_FLEXCLIENT; - } - - /** - * - * Removes a FlexClient from being managed by this manager. - * This method is invoked by FlexClients when they are invalidated. - * - * @param flexClient The id of the FlexClient being invalidated. - */ - protected void removeFlexClient(FlexClient flexClient) - { - if (flexClient != null) - { - String id = flexClient.getId(); - synchronized (id) - { - FlexClient storedClient = flexClients.get(id); - // If the stored instance is the same as the invalidating instance based upon identity, - // remove it. - if (storedClient == flexClient) - flexClients.remove(id); - } - } - } - - //-------------------------------------------------------------------------- - // - // Private Methods - // - //-------------------------------------------------------------------------- - - private void checkForNullAndDuplicateId(String id) - { - if (id == null) - { - // Cannot create ''{0}'' with null id. - MessageException me = new MessageException(); - me.setMessage(10039, new Object[]{"FlexClient"}); - me.setCode("Server.Processing.NullId"); - throw me; - } - - if (flexClients.containsKey(id)) - { - // Cannot create ''{0}'' with id ''{1}''; another ''{0}'' is already registered with the same id. - MessageException me = new MessageException(); - me.setMessage(10040, new Object[]{"FlexClient", id}); - me.setCode("Server.Processing.DuplicateId"); - throw me; - } - } - - private FlexClientOutboundQueueProcessor createDefaultOutboundQueueProcessor( - FlexClient flexClient, String endpointId) - { - FlexClientSettings flexClientSettings = broker.getFlexClientSettings(); - if (flexClientSettings == null) - return null; - - String queueProcessorClassName = flexClientSettings.getFlexClientOutboundQueueProcessorClassName(); - if (queueProcessorClassName == null) - return null; - - FlexClientOutboundQueueProcessor processor = null; - try - { - Class queueProcessorClass = createClass(queueProcessorClassName); - Object instance = ClassUtil.createDefaultInstance(queueProcessorClass, null); - processor = (FlexClientOutboundQueueProcessor)instance; - processor.setFlexClient(flexClient); - processor.setEndpointId(endpointId); - processor.initialize(flexClientSettings.getFlexClientOutboundQueueProcessorProperties()); - } - catch (Throwable t) - { - String message = "Failed to create MessageBroker's outbound queue processor for FlexClient with id '" + flexClient.getId() + "'."; - if (Log.isWarn()) - Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn(message, t); - - MessageException me = new MessageException(message, t); - throw me; - } - - return processor; - } - - private FlexClientOutboundQueueProcessor createCustomOutboundQueueProcessor( - FlexClient flexClient, String endpointId) - { - FlexClientOutboundQueueProcessor processor = null; - Endpoint endpoint = broker.getEndpoint(endpointId); - if (endpoint instanceof AbstractEndpoint) - { - Class processorClass = ((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorClass(); - if (processorClass != null) - { - try - { - Object instance = ClassUtil.createDefaultInstance(processorClass, null); - if (instance instanceof FlexClientOutboundQueueProcessor) - { - processor = (FlexClientOutboundQueueProcessor)instance; - processor.setFlexClient(flexClient); - processor.setEndpointId(endpointId); - processor.initialize(((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorConfig()); - } - } - catch (Throwable t) - { - if (Log.isWarn()) - Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn("Failed to create custom outbound queue processor for FlexClient with id '" + flexClient.getId() + "'. Using MessageBroker's default queue processor.", t); - } - } - } - return processor; - } - - private Class createClass(String className) - { - Class c = ClassUtil.createClass(className, FlexContext.getMessageBroker() == null ? null : - FlexContext.getMessageBroker().getClassLoader()); - - return c; - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlexClientNotSubscribedException.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientNotSubscribedException.java b/core/src/flex/messaging/client/FlexClientNotSubscribedException.java deleted file mode 100644 index f5ffe18..0000000 --- a/core/src/flex/messaging/client/FlexClientNotSubscribedException.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -import flex.messaging.MessageException; -import flex.messaging.log.LogEvent; - -/** - * - */ -public class FlexClientNotSubscribedException extends MessageException -{ - /** - * - */ - private static final long serialVersionUID = 773524927178340950L; - - //-------------------------------------------------------------------------- - // - // Properties - // - //-------------------------------------------------------------------------- - - //---------------------------------- - // defaultLogMessageIntro - //---------------------------------- - - /** - * Overrides the intro text for the log message. - */ - public String getDefaultLogMessageIntro() - { - return "FlexClient not subscribed: "; - } - - //---------------------------------- - // logStackTraceEnabled - //---------------------------------- - - /** - * Override to disable stack trace logging. - */ - public boolean isLogStackTraceEnabled() - { - return false; - } - - //---------------------------------- - // peferredLogLevel - //---------------------------------- - - /** - * Override to lower the preferred log level to debug. - */ - public short getPreferredLogLevel() - { - return LogEvent.DEBUG; - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java b/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java deleted file mode 100644 index 71a5c7e..0000000 --- a/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java +++ /dev/null @@ -1,351 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import flex.messaging.Destination; -import flex.messaging.MessageClient; -import flex.messaging.MessageDestination; -import flex.messaging.config.ConfigMap; -import flex.messaging.messages.Message; -import flex.messaging.services.messaging.ThrottleManager; -import flex.messaging.services.messaging.ThrottleManager.ThrottleResult; -import flex.messaging.services.messaging.ThrottleManager.ThrottleResult.Result; - -/** - * The base FlexClientOutboundQueueProcessor implementation used if a custom implementation is not - * specified. Its behavior is very simple. It adds all new messages in order to the tail - * of the outbound queue and flushes all queued messages to the network as quickly as possible. - * It also handles the outbound client-level throttling specified at the destination level. - */ -public class FlexClientOutboundQueueProcessor -{ - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * The associated FlexClient. - */ - private FlexClient client; - - /** - * The last MessageClient messages were flushed to. This is mainly for faster - * lookup. - */ - private MessageClient lastMessageClient; - - /** - * The associated endpoint's Id. - */ - private String endpointId; - - /** - * Manages throttling of outbound client level messages. - */ - protected OutboundQueueThrottleManager outboundQueueThrottleManager; - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * - * Stores the Id for the outbound queue's endpoint. - * - * @param value The Id for the outbound queue's endpoint. - */ - public void setEndpointId(String value) - { - endpointId = value; - } - - /** - * Returns the Id for the outbound queue's endpoint. - * - * @return The Id for the outbound queue's endpoint. - */ - public String getEndpointId() - { - return endpointId; - } - - /** - * - * Sets the associated FlexClient. - * - * @param value The associated FlexClient. - */ - public void setFlexClient(FlexClient value) - { - client = value; - } - - /** - * Returns the associated FlexClient. - * - * @return The associated FlexClient. - */ - public FlexClient getFlexClient() - { - return client; - } - - /** - * Returns the outbound queue throttle manager, or null if one does not exist. - * - * @return The outbound queue throttle manager. - */ - public OutboundQueueThrottleManager getOutboundQueueThrottleManager() - { - return outboundQueueThrottleManager; - } - - /** - * Utility method to initialize (if necessary) and return an outbound queue - * throttle manager. - * - * @return The outbound queue throttle manager. - */ - public OutboundQueueThrottleManager getOrCreateOutboundQueueThrottleManager() - { - if (outboundQueueThrottleManager == null) - outboundQueueThrottleManager = new OutboundQueueThrottleManager(this); - return outboundQueueThrottleManager; - } - - /** - * No-op; this default implementation doesn't require custom initialization. - * Subclasses may override to process any custom initialization properties that have been - * defined in the server configuration. - * - * @param properties A ConfigMap containing any custom initialization properties. - */ - public void initialize(ConfigMap properties) {} - - /** - * Always adds a new message to the tail of the queue. - * - * @param outboundQueue The queue of outbound messages. - * @param message The new message to add to the queue. - */ - public void add(List<Message> outboundQueue, Message message) - { - outboundQueue.add(message); - } - - /** - * Always empties the queue and returns all messages to be sent to the client. - * - * @param outboundQueue The queue of outbound messages. - * @return A FlushResult containing the messages that have been removed from the outbound queue - * to be written to the network and a wait time for the next flush of the outbound queue - * that is the default for the underlying Channel/Endpoint. - */ - public FlushResult flush(List<Message> outboundQueue) - { - return flush(null /* no client distinction */, outboundQueue); - } - - /** - * Removes all messages in the queue targeted to this specific MessageClient subscription(s) and - * returns them to be sent to the client. - * Overrides should be careful to only return messages for the specified MessageClient. - * - * @param messageClient The specific MessageClient to return messages for. - * @param outboundQueue The queue of outbound messages. - * @return A FlushResult containing the messages that have been removed from the outbound queue - * to be written to the network for this MessageClient. - */ - public FlushResult flush(MessageClient messageClient, List<Message> outboundQueue) - { - FlushResult flushResult = new FlushResult(); - List<Message> messagesToFlush = null; - - for (Iterator<Message> iter = outboundQueue.iterator(); iter.hasNext();) - { - Message message = iter.next(); - if (messageClient == null || (message.getClientId().equals(messageClient.getClientId()))) - { - if (isMessageExpired(message)) // Don't flush expired messages. - { - iter.remove(); - continue; - } - - // If no message client was explicitly provided, get the message client from - // the current message. - MessageClient messageClientForCurrentMessage = messageClient == null ? - getMessageClient(message) : messageClient; - - // First, apply the destination level outbound throttling. - ThrottleResult throttleResult = - throttleOutgoingDestinationLevel(messageClientForCurrentMessage, message, false); - Result result = throttleResult.getResult(); - - // No destination level throttling; check destination-client level throttling. - if (Result.OK == result) - { - throttleResult = throttleOutgoingClientLevel(messageClientForCurrentMessage, message, false); - result = throttleResult.getResult(); - // If no throttling, simply add the message to the list. - if (Result.OK == result) - { - updateMessageFrequencyOutgoing(messageClientForCurrentMessage, message); - if (messagesToFlush == null) - messagesToFlush = new ArrayList<Message>(); - messagesToFlush.add(message); - } - // In rest of the policies (which is NONE), simply don't - // add the message to the list. - } - iter.remove(); - } - } - - flushResult.setMessages(messagesToFlush); - return flushResult; - } - - /** - * Utility method to test whether a message has expired or not. - * Messages with a timeToLive value that is shorter than the timespan from the message's - * timestamp up to the current system time will cause this method to return true. - * If there are expired messages in the outbound queue, flush implementations - * should use this helper method to only process and return messages that have - * not yet expired. - * - * @param message The message to test for expiration. - * - * @return true if the message has a timeToLive value that has expired; otherwise false. - */ - public boolean isMessageExpired(Message message) - { - return (message.getTimeToLive() > 0 && - (System.currentTimeMillis() - message.getTimestamp()) >= message.getTimeToLive()); - } - - /** - * Attempts to throttle the outgoing message at the destination level. - * - * @param msgClient The client the message is intended for. - * @param message The message to consider to throttle. - * @param buffered Whether the message has already been buffered. In that case, - * parts of regular throttling code is skipped. - * @return The result of throttling attempt. - */ - protected ThrottleResult throttleOutgoingDestinationLevel( - MessageClient msgClient, Message message, boolean buffered) - { - ThrottleManager throttleManager = getThrottleManager(msgClient); - if (throttleManager != null) - { - // In already buffered messages, don't use ThrottleManager#throttleOutgoingMessage - // to avoid regular throttling handling as the message has already been buffered. - if (buffered) - return throttleManager.throttleDestinationLevel(message, false /*incoming*/); - - // Otherwise, regular throttling. - return throttleManager.throttleOutgoingMessage(message); - } - return new ThrottleResult(); // Otherwise, return OK result. - } - - /** - * Attempts to throttle the outgoing message at the destination-client level. - * - * @param msgClient The client the message is intended for. - * @param message The message to consider to throttle. - * @param buffered Whether the message has already been buffered. In that case, - * parts of regular throttling code is skipped. - * @return The result of throttling attempt. - */ - protected ThrottleResult throttleOutgoingClientLevel(MessageClient msgClient, Message message, boolean buffered) - { - if (outboundQueueThrottleManager != null) // Means client level throttling enabled. - { - ThrottleResult throttleResult = outboundQueueThrottleManager.throttleOutgoingClientLevel(message); - if (!buffered) - { - ThrottleManager throttleManager = getThrottleManager(msgClient); - if (throttleManager != null) - throttleManager.handleOutgoingThrottleResult(message, throttleResult, true /*isClientLevel*/); - } - return throttleResult; - } - return new ThrottleResult(); // Otherwise, return OK result. - } - - /** - * Returns the message client that the message is intended to. - * - * @param message The message. - * @return The message client that the message is intended to. - */ - protected MessageClient getMessageClient(Message message) - { - // First try using the cached message client. - if (lastMessageClient != null && message.getClientId().equals(lastMessageClient.getClientId())) - { - return lastMessageClient; - } - else // Go ahead with the lookup. - { - lastMessageClient = client.getMessageClient((String)message.getClientId()); - return lastMessageClient; - } - } - - /** - * Returns the throttle manager associated with the destination the message - * is intended to. - * - * @param msgClient The message client; it can be null. - * @return The throttle manager. - */ - protected ThrottleManager getThrottleManager(MessageClient msgClient) - { - Destination destination = msgClient != null? msgClient.getDestination() : null; - return (destination != null && destination instanceof MessageDestination)? - ((MessageDestination)destination).getThrottleManager() : null; - } - - /** - * Updates the outgoing message's message frequency. - * - * @param msgClient The MessageClient that might have been passed to the flush; it can be null. - * @param message The message. - */ - protected void updateMessageFrequencyOutgoing(MessageClient msgClient, Message message) - { - // Update the destination level message frequency. - ThrottleManager throttleManager = getThrottleManager(msgClient); - if (throttleManager != null) - throttleManager.updateMessageFrequencyDestinationLevel(false /*incoming*/); - - // Update the client level message frequency. - if (outboundQueueThrottleManager != null) - outboundQueueThrottleManager.updateMessageFrequencyOutgoingClientLevel(message); - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlushResult.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlushResult.java b/core/src/flex/messaging/client/FlushResult.java deleted file mode 100644 index ab400fa..0000000 --- a/core/src/flex/messaging/client/FlushResult.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -import java.util.List; - -import flex.messaging.messages.Message; - -/** - * Stores the messages that should be written to the network as a result of a flush - * invocation on a FlexClient's outbound queue. - */ -public class FlushResult -{ - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs a <tt>FlushResult</tt> instance to return from a - * flush invocation on a FlexClient's outbound queue. - * This instance stores the list of messages to write over the network to - * the client as well as an optional wait time in milliseconds for when the - * next flush should be invoked. - */ - public FlushResult() {} - - //-------------------------------------------------------------------------- - // - // Properties - // - //-------------------------------------------------------------------------- - - //---------------------------------- - // messages - //---------------------------------- - - private List<Message> messages; - - /** - * Returns the messages to write to the network for this flush invocation. - * This list may be null, in which case no messages are written. - * - * @return The messages to write to the network for this flush invocation. - */ - public List<Message> getMessages() - { - return messages; - } - - /** - * Sets the messages to write to the network for this flush invocation. - * - * @param value The messages to write to the network for this flush invocation. - */ - public void setMessages(List<Message> value) - { - messages = value; - } - - //---------------------------------- - // nextFlushWaitTimeMillis - //---------------------------------- - - private int nextFlushWaitTimeMillis = 0; - - /** - * Returns the wait time in milliseconds for when the next flush invocation should occur. - * If this value is 0, the default, a delayed flush is not scheduled and the next flush will - * depend upon the underlying Channel/Endpoint. - * For client-side polling Channels the next flush invocation will happen when the client sends - * its next poll request at its regular interval. - * For client-side Channels that support direct writes to the client a flush invocation is triggered - * when the next message is added to the outbound queue. - * - * @return The wait time in milliseconds before flush is next invoked. A value of 0, the default, - * indicates that the default flush behavior for the underlying Channel/Endpoint should be - * used. - */ - public int getNextFlushWaitTimeMillis() - { - return nextFlushWaitTimeMillis; - } - - /** - * Sets the wait time in milliseconds for when the next flush invocation should occur. - * If this value is 0, the default, a delayed flush is not scheduled and the next flush will - * depend upon the underlying Channel/Endpoint. - * For client-side polling Channels the next flush invocation will happen when the client sends - * its next poll request at its regular interval. - * For client-side Channels that support direct writes to the client a flush invocation is triggered - * when the next message is added to the outbound queue. - * Negative value assignments are treated as 0. - * - * @param value The wait time in milliseconds before flush will be invoked. - */ - public void setNextFlushWaitTimeMillis(int value) - { - nextFlushWaitTimeMillis = (value < 1) ? 0 : value; - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/OutboundQueueThrottleManager.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/OutboundQueueThrottleManager.java b/core/src/flex/messaging/client/OutboundQueueThrottleManager.java deleted file mode 100644 index 24a66d0..0000000 --- a/core/src/flex/messaging/client/OutboundQueueThrottleManager.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -import java.util.concurrent.ConcurrentHashMap; - -import flex.messaging.MessageClient.SubscriptionInfo; -import flex.messaging.config.ThrottleSettings.Policy; -import flex.messaging.log.Log; -import flex.messaging.messages.Message; -import flex.messaging.services.messaging.MessageFrequency; -import flex.messaging.services.messaging.ThrottleManager; -import flex.messaging.services.messaging.ThrottleManager.ThrottleResult; -import flex.messaging.util.StringUtils; - - -/** - * Used to keep track of and limit outbound message rates of a single FlexClient queue. - * An outbound FlexClient queue can contain messages from multiple MessageClients - * across multiple destinations. It can also contain messages for multiple - * subscriptions (for each subtopic/selector) across the same destination for - * the same MessageClient. - */ -public class OutboundQueueThrottleManager -{ - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs a default outbound queue throttle manager. - * - * @param processor The outbound queue processor that is using this throttle manager. - */ - public OutboundQueueThrottleManager(FlexClientOutboundQueueProcessor processor) - { - destinationFrequencies = new ConcurrentHashMap<String, DestinationFrequency>(); - this.processor = processor; - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * Map of destination id and destination message frequencies. - */ - protected final ConcurrentHashMap<String, DestinationFrequency> destinationFrequencies; - - /** - * The parent queue processor of the throttle manager. - */ - protected final FlexClientOutboundQueueProcessor processor; - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Registers the destination with the outbound throttle manager. - * - * @param destinationId The id of the destination. - * @param outboundMaxClientFrequency The outbound max-client-frequency specified - * at the destination. - * @param outboundPolicy The outbound throttle policy specified at the destination. - */ - public void registerDestination(String destinationId, int outboundMaxClientFrequency, Policy outboundPolicy) - { - DestinationFrequency frequency = destinationFrequencies.get(destinationId); - if (frequency == null) - { - frequency = new DestinationFrequency(outboundMaxClientFrequency, outboundPolicy); - destinationFrequencies.putIfAbsent(destinationId, frequency); - } - } - - /** - * Registers the subscription of a client talking to a destination with the - * specified subscription info. - * - * @param destinationId The destination id. - * @param si The subscription information. - */ - public void registerSubscription(String destinationId, SubscriptionInfo si) - { - DestinationFrequency frequency = destinationFrequencies.get(destinationId); - frequency.logMaxFrequencyDuringRegistration(frequency.outboundMaxClientFrequency, si); - } - - /** - * Unregisters the subscription. - * - * @param destinationId The destination id. - * @param si The subscription information. - */ - public void unregisterSubscription(String destinationId, SubscriptionInfo si) - { - unregisterDestination(destinationId); - } - - /** - * Unregisters all subscriptions of the client under the specified destination. - * - * @param destinationId The destination id. - */ - public void unregisterAllSubscriptions(String destinationId) - { - unregisterDestination(destinationId); - } - - /** - * Attempts to throttle the outgoing message. - * - * @param message The message to consider to throttle. - * @return True if the message was throttled; otherwise false. - */ - public ThrottleResult throttleOutgoingClientLevel(Message message) - { - String destinationId = message.getDestination(); - if (isDestinationRegistered(destinationId)) - { - DestinationFrequency frequency = destinationFrequencies.get(message.getDestination()); - int maxFrequency = frequency.getMaxFrequency(message); // Limit to check against. - MessageFrequency messageFrequency = frequency.getMessageFrequency(message); // Message rate of the client. - if (messageFrequency != null) - { - ThrottleResult result = messageFrequency.checkLimit(maxFrequency, frequency.outboundPolicy); - return result; - } - } - return new ThrottleResult(); // Otherwise, return OK result. - } - - /** - * Updates the outgoing client level message frequency of the message. - * - * @param message The message. - */ - public void updateMessageFrequencyOutgoingClientLevel(Message message) - { - String destinationId = message.getDestination(); - if (isDestinationRegistered(destinationId)) - { - DestinationFrequency frequency = destinationFrequencies.get(message.getDestination()); - MessageFrequency messageFrequency = frequency.getMessageFrequency(message); - if (messageFrequency != null) - messageFrequency.updateMessageFrequency(); - } - } - - //-------------------------------------------------------------------------- - // - // Protected Methods - // - //-------------------------------------------------------------------------- - - /** - * Determines whether the destination has been registered or not. - * - * @param destinationId The destination id. - * @return True if the destination with the specified id has been registered. - */ - protected boolean isDestinationRegistered(String destinationId) - { - return destinationFrequencies.containsKey(destinationId); - } - - /** - * Unregisters the destination from the outbound throttle manager. - * - * @param destinationId The id of the destination. - */ - protected void unregisterDestination(String destinationId) - { - if (isDestinationRegistered(destinationId)) - destinationFrequencies.remove(destinationId); - } - - //-------------------------------------------------------------------------- - // - // Inner Classes - // - //-------------------------------------------------------------------------- - - /** - * Used to keep track of max-client-frequency and outgoing throttle policy - * specified at the destination. It also keeps track of outbound message - * rates of all MessageClient subscriptions across the destination. - */ - class DestinationFrequency - { - protected final int outboundMaxClientFrequency; // destination specified client limit. - protected final MessageFrequency outboundClientFrequency; - protected final Policy outboundPolicy; // destination specified policy. - - /** - * Default constructor. - * - * @param outboundMaxClientFrequency The outbound throttling max-client-frequency of the destination. - * @param outboundPolicy The outbound throttling policy of the destination. - */ - DestinationFrequency(int outboundMaxClientFrequency, Policy outboundPolicy) - { - this.outboundMaxClientFrequency = outboundMaxClientFrequency; - this.outboundPolicy = outboundPolicy; - outboundClientFrequency = new MessageFrequency(outboundMaxClientFrequency); - } - - /** - * Returns the max-client-frequency for the subscription the message is - * intended for (which is simply the max-client-frequency specified at - * the destination). - * - * @param message The message. - * - * @return The max-frequency for the subscription. - */ - int getMaxFrequency(Message message) - { - return outboundMaxClientFrequency; - } - - /** - * Given a subscription the message is intended to, returns the message - * rate frequency for that subscription. - * - * @param message The message. - * @return The message frequency for the subscription, if it exists; otherwise null. - */ - MessageFrequency getMessageFrequency(Message message) - { - return outboundClientFrequency; - } - - /** - * Utility function to log the maxFrequency being used during subscription. - * - * @param maxFrequency The maxFrequency to log. - */ - void logMaxFrequencyDuringRegistration(int maxFrequency, SubscriptionInfo si) - { - if (Log.isDebug()) - Log.getLogger(ThrottleManager.LOG_CATEGORY).debug("Outbound queue throttle manager for FlexClient '" - + processor.getFlexClient().getId() + "' is using '" + maxFrequency - + "' as the throttling limit for its subscription: " - + StringUtils.NEWLINE + si); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/PollFlushResult.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/PollFlushResult.java b/core/src/flex/messaging/client/PollFlushResult.java deleted file mode 100644 index dc8e33b..0000000 --- a/core/src/flex/messaging/client/PollFlushResult.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -/** - * Extends <tt>FlushResult</tt> and adds additional properties for controlling - * client polling behavior. - */ -public class PollFlushResult extends FlushResult -{ - //-------------------------------------------------------------------------- - // - // Properties - // - //-------------------------------------------------------------------------- - - //---------------------------------- - // avoidBusyPolling - //---------------------------------- - - private boolean avoidBusyPolling; - - /** - * Indicates whether the handling of this result should attempt to avoid - * potential busy-polling cycles. - * This will be set to <code>true</code> in the case of two clients that are both - * long-polling the server over the same session. - * - * @return <code>true</code> if the handling of this result should attempt to avoid potential - * busy-polling cycles. - */ - public boolean isAvoidBusyPolling() - { - return avoidBusyPolling; - } - - /** - * Set to <code>true</code> to signal that handling for this result should attempt to avoid - * potential busy-polling cycles. - * - * @param value <code>true</code> to signal that handling for this result should attempt to - * avoid potential busy-polling cycles. - */ - public void setAvoidBusyPolling(boolean value) - { - avoidBusyPolling = value; - } - - //---------------------------------- - // clientProcessingSuppressed - //---------------------------------- - - private boolean clientProcessingSuppressed; - - /** - * Indicates whether client processing of this result should be - * suppressed. - * This should be <code>true</code> for results generated for poll requests - * that arrive while a long-poll request from the same client is being serviced - * to avoid a busy polling cycle. - * - * @return <code>true</code> if client processing of this result should be suppressed; - * otherwise <code>false</code>. - */ - public boolean isClientProcessingSuppressed() - { - return clientProcessingSuppressed; - } - - /** - * Set to <code>true</code> to suppress client processing of this result. - * Default is <code>false</code>. - * This should be set to <code>true</code> for results generated for poll requests - * that arrive while a long-poll request from the same client is being serviced - * to avoid a busy polling cycle. - * - * @param value <code>true</code> to suppress client processing of the result. - */ - public void setClientProcessingSuppressed(boolean value) - { - clientProcessingSuppressed = value; - } -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/PollWaitListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/PollWaitListener.java b/core/src/flex/messaging/client/PollWaitListener.java deleted file mode 100644 index 7978e6f..0000000 --- a/core/src/flex/messaging/client/PollWaitListener.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -/** - * Used alongside invocations of <code>FlexClient.pollWithWait()</code> to allow calling code to - * maintain a record of the Objects being used to place waited poll requests into a wait - * state. This can be used to break the threads out of their wait state separately from the - * internal waited poll handling within <code>FlexClient</code>. - */ -public interface PollWaitListener -{ - /** - * Hook method invoked directly before a wait begins. - * - * @param notifier The <tt>Object</tt> being used to <code>wait()/notify()</code>. - */ - void waitStart(Object notifier); - - /** - * Hook method invoked directly after a wait completes. - * - * @param notifier The <tt>Object</tt> being used to <code>wait()/notify()</code>. - */ - void waitEnd(Object notifier); -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/UserAgentSettings.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/UserAgentSettings.java b/core/src/flex/messaging/client/UserAgentSettings.java deleted file mode 100644 index 217f2c7..0000000 --- a/core/src/flex/messaging/client/UserAgentSettings.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -/** - * A class to hold user agent specific properties. For example, in streaming - * endpoints, a certain number of bytes need to be written before the - * streaming connection can be used and this value is specific to user agents. - * Similarly, the number of simultaneous connections a session can have is user - * agent specific. - */ -public class UserAgentSettings -{ - /** - * The prefixes of the version token used by various browsers. - */ - public static final String USER_AGENT_ANDROID = "Android"; - public static final String USER_AGENT_CHROME = "Chrome"; - public static final String USER_AGENT_FIREFOX = "Firefox"; - public static final String USER_AGENT_FIREFOX_1 = "Firefox/1"; - public static final String USER_AGENT_FIREFOX_2 = "Firefox/2"; - public static final String USER_AGENT_MSIE = "MSIE"; - public static final String USER_AGENT_MSIE_5 = "MSIE 5"; - public static final String USER_AGENT_MSIE_6 = "MSIE 6"; - public static final String USER_AGENT_MSIE_7 = "MSIE 7"; - public static final String USER_AGENT_OPERA = "Opera"; - public static final String USER_AGENT_OPERA_8 = "Opera 8"; - // Opera 10,11 ship as User Agent Opera/9.8. - public static final String USER_AGENT_OPERA_10 = "Opera/9.8"; - public static final String USER_AGENT_SAFARI = "Safari"; - - /** - * Bytes needed to kickstart the streaming connections for IE. - */ - public static final int KICKSTART_BYTES_MSIE = 2048; - /** - * Bytes needed to kickstart the streaming connections for SAFARI. - */ - public static final int KICKSTART_BYTES_SAFARI = 512; - /** - * Bytes needs to kicksart the streaming connections for Android. - */ - public static final int KICKSTART_BYTES_ANDROID = 4010; - - /** - * The default number of persistent connections per session for various browsers. - */ - private static final int MAX_PERSISTENT_CONNECTIONS_LEGACY = 1; - public static final int MAX_PERSISTENT_CONNECTIONS_DEFAULT = 5; - private static final int MAX_PERSISTENT_CONNECTIONS_OPERA_LEGACY = 3; - private static final int MAX_PERSISTENT_CONNECTIONS_CHROME = MAX_PERSISTENT_CONNECTIONS_DEFAULT; - private static final int MAX_PERSISTENT_CONNECTIONS_FIREFOX = MAX_PERSISTENT_CONNECTIONS_DEFAULT; - private static final int MAX_PERSISTENT_CONNECTIONS_MSIE = MAX_PERSISTENT_CONNECTIONS_DEFAULT; - private static final int MAX_PERSISTENT_CONNECTIONS_OPERA = 7; - private static final int MAX_PERSISTENT_CONNECTIONS_SAFARI = 3; - - private String matchOn; - private int kickstartBytes; - private int maxPersistentConnectionsPerSession = MAX_PERSISTENT_CONNECTIONS_DEFAULT; - - /** - * Static method to retrieve pre-initialized user agents which are as follows: - * - * In Chrome 0, 1, 2, the limit is 6: - * match-on="Chrome" max-persistent-connections-per-session="5" - * - * In Firefox 1, 2, the limit is 2: - * match-on="Firefox" max-persistent-connections-per-session="1" - * - * In Firefox 3, the limit is 6: - * match-on="Firefox/3" max-persistent-connections-per-session="5" - * - * In MSIE 5, 6, 7, the limit is 2 with kickstart bytes of 2K: - * match-on="MSIE" max-persistent-connections-per-session="1" kickstart-bytes="2048" - * - * In MSIE 8, the limit is 6 with kickstart bytes of 2K: - * match-on="MSIE 8" max-persistent-connections-per-session="5" kickstart-bytes="2048" - * - * In Opera 7, 9, the limit is 4: - * match-on="Opera" max-persistent-connections-per-session="3" - * - * In Opera 8, the limit is 8: - * match-on="Opera 8" max-persistent-connections-per-session="7" - * - * In Opera 10, the limit is 8. - * match-on="Opera 10" max-persistent-connections-per-session="7" - * - * In Safari 3, 4, the limit is 4. - * match-on="Safari" max-persistent-connections-per-session="3" - * - * @param matchOn String to use match the agent. - */ - public static UserAgentSettings getAgent(String matchOn) - { - UserAgentSettings userAgent = new UserAgentSettings(); - userAgent.setMatchOn(matchOn); - - if (USER_AGENT_ANDROID.equals(matchOn)) - { - userAgent.setKickstartBytes(KICKSTART_BYTES_ANDROID); - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_SAFARI); - } - if (USER_AGENT_CHROME.equals(matchOn)) - { - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_CHROME); - } - else if (USER_AGENT_FIREFOX.equals(matchOn)) - { - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_FIREFOX); - } - else if (USER_AGENT_FIREFOX_1.equals(matchOn)) - { - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); - } - else if (USER_AGENT_FIREFOX_2.equals(matchOn)) - { - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); - } - else if (USER_AGENT_MSIE.equals(matchOn)) - { - userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE); - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_MSIE); - } - else if (USER_AGENT_MSIE_5.equals(matchOn)) - { - userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE); - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); - } - else if (USER_AGENT_MSIE_6.equals(matchOn)) - { - userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE); - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); - } - else if (USER_AGENT_MSIE_7.equals(matchOn)) - { - userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE); - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); - } - else if (USER_AGENT_OPERA.equals(matchOn)) - { - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA_LEGACY); - } - else if (USER_AGENT_OPERA_8.equals(matchOn)) - { - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA); - } - else if (USER_AGENT_OPERA_10.equals(matchOn)) - { - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA); - } - else if (USER_AGENT_SAFARI.equals(matchOn)) - { - userAgent.setKickstartBytes(KICKSTART_BYTES_SAFARI); - userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_SAFARI); - } - return userAgent; - } - - /** - * Returns the String to use to match the agent. - * - * @return The String to use to match the agent. - */ - public String getMatchOn() - { - return matchOn; - } - - /** - * Sets the String to use to match the agent. - * - * @param matchOn The String to use to match the agent. - */ - public void setMatchOn(String matchOn) - { - this.matchOn = matchOn; - } - - /** - * Returns the number of bytes needed to kickstart the streaming connections - * for the user agent. - * - * @return The number of bytes needed to kickstart the streaming connections - * for the user agent. - */ - public int getKickstartBytes() - { - return kickstartBytes; - } - - /** - * Sets the number of bytes needed to kickstart the streaming connections - * for the user agent. - * - * @param kickstartBytes The number of bytes needed to kickstart the streaming - * connections for the user agent. - */ - public void setKickstartBytes(int kickstartBytes) - { - if (kickstartBytes < 0) - kickstartBytes = 0; - this.kickstartBytes = kickstartBytes; - } - - /** - * @deprecated Use {@link UserAgentSettings#getMaxPersistentConnectionsPerSession()} instead. - */ - public int getMaxStreamingConnectionsPerSession() - { - return getMaxPersistentConnectionsPerSession(); - } - - /** - * @deprecated Use {@link UserAgentSettings#setMaxPersistentConnectionsPerSession(int)} instead. - */ - public void setMaxStreamingConnectionsPerSession(int maxStreamingConnectionsPerSession) - { - setMaxPersistentConnectionsPerSession(maxStreamingConnectionsPerSession); - } - - /** - * Returns the number of simultaneous streaming connections per session - * the user agent supports. - * - * @return The number of streaming connections per session the user agent supports. - */ - public int getMaxPersistentConnectionsPerSession() - { - return maxPersistentConnectionsPerSession; - } - - /** - * Sets the number of simultaneous streaming connections per session - * the user agent supports. - * - * @param maxStreamingConnectionsPerSession The number of simultaneous - * streaming connections per session the user agent supports. - */ - public void setMaxPersistentConnectionsPerSession(int maxStreamingConnectionsPerSession) - { - this.maxPersistentConnectionsPerSession = maxStreamingConnectionsPerSession; - } - -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/package-info.java b/core/src/flex/messaging/client/package-info.java deleted file mode 100644 index 97d5848..0000000 --- a/core/src/flex/messaging/client/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package flex.messaging.client; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/cluster/BroadcastHandler.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/cluster/BroadcastHandler.java b/core/src/flex/messaging/cluster/BroadcastHandler.java deleted file mode 100644 index f036622..0000000 --- a/core/src/flex/messaging/cluster/BroadcastHandler.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.cluster; - -import java.util.List; - -/** - * - * This interface represents a handler for a message broadcast by a Cluster. - * Clusters broadcast messages across their physical nodes, and when they - * receive those messages they locate a BroadcastHandler capable of handling - * the broadcast. - */ -public interface BroadcastHandler -{ - /** - * Handle the broadcast message. - * - * @param sender sender of the original message - * @param params any parameters need to handle the message - */ - void handleBroadcast(Object sender, List<Object> params); - - /** - * Determine whether this Handler supports a particular operation by name. - * - * @return whether or not this handler supports the named operation - * @param name name of the operation - */ - boolean isSupportedOperation(String name); -} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/cluster/Cluster.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/cluster/Cluster.java b/core/src/flex/messaging/cluster/Cluster.java deleted file mode 100644 index 0339c55..0000000 --- a/core/src/flex/messaging/cluster/Cluster.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.cluster; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.w3c.dom.Element; - -import flex.messaging.config.ConfigMap; -import flex.messaging.log.LogCategories; - -/** - * - * Base interface for cluster implementations. - */ -public abstract class Cluster -{ - /** - * Default log category for clustering. - */ - public static final String LOG_CATEGORY = LogCategories.SERVICE_CLUSTER; - - /** - * Listeners to be notified when a node is removed from the cluster. - */ - List removeNodeListeners = Collections.synchronizedList(new ArrayList()); - - /** - * Cluster properties file. - */ - Element clusterPropertiesFile; - - /** - * Specifies whether or not this is the default cluster. - */ - boolean def; - - /** - * Specifies if this cluster is enabled for URL load balancing. - */ - boolean urlLoadBalancing; - - /** - * Because destinations are the constructs which become clustered, clusters - * are identified by a unique name composed in the format - * "serviceType:destinationId". - * - * @return The unique name for the clustered destination. - * @param serviceType The name of the service for this destination. - * @param destinationName The original name of the destination. - */ - static String getClusterDestinationKey(String serviceType, String destinationName) - { - StringBuffer sb = new StringBuffer(); - sb.append(serviceType); - sb.append(':'); - sb.append(destinationName); - return sb.toString(); - } - - /** - * Add a listener for remove cluster node notification. - * - * @param listener the RemoveNodeListener to add - */ - public void addRemoveNodeListener(RemoveNodeListener listener) - { - removeNodeListeners.add(listener); - } - - /** - * Send notification to remove node listeners that a node has - * been removed from the cluster. - * - * @param address The node that was removed from the cluster. - */ - protected void sendRemoveNodeListener(Object address) - { - synchronized (removeNodeListeners) - { - for (int i = 0; i < removeNodeListeners.size(); i++) - ((RemoveNodeListener)removeNodeListeners.get(i)).removeClusterNode(address); - } - } - - /** - * Initializes the Cluster with id and the map of properties. The default - * implementation is no-op. - * - * @param id The cluster id. - * @param properties The map of properties. - */ - public void initialize(String id, ConfigMap properties) - { - // No-op. - } - - /** - * Returns the cluster properties file. - * - * @return The cluster properties file. - */ - public Element clusterPropertiesFile() - { - return clusterPropertiesFile; - } - - /** - * Sets the cluster properties file. - * - * @param value The cluster properties file. - */ - public void setClusterPropertiesFile(Element value) - { - this.clusterPropertiesFile = value; - } - - /** - * Returns true if this is the default cluster for any destination that does not - * specify a clustered destination. - * - * @return Returns true if this is the default cluster. - */ - public boolean isDefault() - { - return def; - } - - /** - * When true, this is the default cluster for any destination that does not - * specify a clustered destination. - * - * @param d true if this is the default cluster - */ - public void setDefault(boolean d) - { - this.def = d; - } - - /** - * When true, this cluster is enabled for URL load balancing. - * - * @return true if this cluster enabled for load balancing. - */ - public boolean getURLLoadBalancing() - { - return urlLoadBalancing; - } - - /** - * When true, the cluster is enabled for URL load balancing. - * - * @param u the flag to enable the URL load balancing - */ - public void setURLLoadBalancing(boolean u) - { - urlLoadBalancing = u; - } - - /** - * Shutdown the cluster. - */ - public abstract void destroy(); - - /** - * Retrieve a List of Maps, where each Map contains channel id keys - * mapped to endpoint URLs for the given service type and destination name. - * There is exactly one endpoint URL for each - * channel id. This List represents all of the known endpoint URLs - * for all of the channels in the Cluster. - * @param serviceType the service type - * @param destName the destination name - * @return List of maps of channel ids to endpoint URLs for each node in - * the cluster. - */ - public abstract List getAllEndpoints(String serviceType, String destName); - - /** - * Returns a list of all of the nodes of this cluster. - * @return List a list of member IP addresses in the cluster - */ - public abstract List getMemberAddresses(); - - /** - * Returns the local cluster node. - * @return Object the Local Address object - */ - public abstract Object getLocalAddress(); - - /** - * Broadcast a service-related operation, which usually includes a Message as a method parameter. This method - * allows a local service to process a Message and then send the Message to the services on all peer nodes - * so that they may perform the same processing. - * - * @param serviceOperation The operation to broadcast. - * @param params Parameters for the operation. - */ - public abstract void broadcastServiceOperation(String serviceOperation, Object[] params); - - /** - * Send a service-related operation in point-to-point fashion to one and only one member of the cluster. - * This is similar to the broadcastServiceOperation except that this invocation is sent to the first - * node among the cluster members that does not have the local node's address. - * - * @param serviceOperation The operation to send. - * @param params Parameters for the operation. - * @param targetAddress the target address of a remote node in the cluster - */ - public abstract void sendPointToPointServiceOperation(String serviceOperation, Object[] params, Object targetAddress); - - /** - * Add a local endpoint URL for a local channel. After doing so, broadcast the information to - * peers so that they will be aware of the URL. - * - * @param serviceType the service type of the endpoint - * @param destName the destination name - * @param channelId the Channel ID - * @param endpointUrl the endpoint URL - * @param endpointPort the endpoint port - */ - public abstract void addLocalEndpointForChannel(String serviceType, String destName, - String channelId, String endpointUrl, int endpointPort); -}
