http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientBindingEvent.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientBindingEvent.java b/core/src/flex/messaging/client/FlexClientBindingEvent.java new file mode 100644 index 0000000..3fa20de --- /dev/null +++ b/core/src/flex/messaging/client/FlexClientBindingEvent.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +/** + * Event used to notify FlexClientAttributeListeners of changes to FlexClient + * attributes. + */ +public class FlexClientBindingEvent +{ + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs an event for an attribute that is bound or unbound from a FlexClient. + * + * @param client The FlexClient. + * @param name The attribute name. + */ + public FlexClientBindingEvent(FlexClient client, String name) + { + this.client = client; + this.name = name; + } + + + /** + * Constructs an event for an attribute that is added to a FlexClient or + * replaced by a new value. + * + * @param client The FlexClient. + * @param name The attribute name. + * @param value The attribute value. + */ + public FlexClientBindingEvent(FlexClient client, String name, Object value) + { + this.client = client; + this.name = name; + this.value = value; + } + + //-------------------------------------------------------------------------- + // + // Variables + // + //-------------------------------------------------------------------------- + + /** + * The FlexClient that generated the event. + */ + private FlexClient client; + + /** + * The name of the attribute associated with the event. + */ + private String name; + + /** + * The value of the attribute associated with the event. + */ + private Object value; + + //-------------------------------------------------------------------------- + // + // Methods + // + //-------------------------------------------------------------------------- + + /** + * Returns the FlexClient that generated the event. + * + * @return The FlexClient that generated the event. + */ + public FlexClient getClient() + { + return client; + } + + /** + * Returns the name of the attribute associated with the event. + * + * @return The name of the attribute associated with the event. + */ + public String getName() + { + return name; + } + + /** + * Returns the value of the attribute associated with the event. + * + * @return The value of the attribute associated with the event. + */ + public Object getValue() + { + return value; + } +}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientBindingListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientBindingListener.java b/core/src/flex/messaging/client/FlexClientBindingListener.java new file mode 100644 index 0000000..50c319f --- /dev/null +++ b/core/src/flex/messaging/client/FlexClientBindingListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +/** + * Interface to be notified when the implementing object is bound or unbound from the FlexClient. + */ +public interface FlexClientBindingListener +{ + /** + * Callback invoked when the object is bound to a FlexClient. + * + * @param event The event containing the FlexClient and attribute + * information. + */ + void valueBound(FlexClientBindingEvent event); + + /** + * Callback invoked when the object is unbound from a FlexClient. + * + * @param event The event containing the FlexClient and attribute + * information. + */ + void valueUnbound(FlexClientBindingEvent event); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientListener.java b/core/src/flex/messaging/client/FlexClientListener.java new file mode 100644 index 0000000..7c8cd3d --- /dev/null +++ b/core/src/flex/messaging/client/FlexClientListener.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +/** + * Interface to be notified when a FlexClient is created or destroyed. Implementations of this interface + * may add themselves as created listeners statically via <code>FlexClient.addClientCreatedListener()</code>. + * To listen for FlexClient destruction, the implementation instance must add itself as a listener to + * a specific FlexClient instance via the <code>addClientDestroyedListener()</code> method. + */ +public interface FlexClientListener +{ + /** + * Notification that a FlexClient was created. + * + * @param client The FlexClient that was created. + */ + void clientCreated(FlexClient client); + + /** + * Notification that a FlexClient is about to be destroyed. + * + * @param client The FlexClient that will be destroyed. + */ + void clientDestroyed(FlexClient client); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientManager.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientManager.java b/core/src/flex/messaging/client/FlexClientManager.java new file mode 100644 index 0000000..943956d --- /dev/null +++ b/core/src/flex/messaging/client/FlexClientManager.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadFactory; + +import flex.management.ManageableComponent; +import flex.management.runtime.messaging.client.FlexClientManagerControl; +import flex.messaging.FlexContext; +import flex.messaging.MessageBroker; +import flex.messaging.MessageException; +import flex.messaging.config.FlexClientSettings; +import flex.messaging.endpoints.AbstractEndpoint; +import flex.messaging.endpoints.Endpoint; +import flex.messaging.log.Log; +import flex.messaging.log.LogCategories; +import flex.messaging.util.ClassUtil; +import flex.messaging.util.TimeoutAbstractObject; +import flex.messaging.util.TimeoutManager; + +/** + * + * Manages FlexClient instances for a MessageBroker. + */ +public class FlexClientManager extends ManageableComponent +{ + public static final String TYPE = "FlexClientManager"; + + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * + */ + public FlexClientManager() + { + this(MessageBroker.getMessageBroker(null)); + } + /** + * Constructs a FlexClientManager for the passed MessageBroker. + * + * @param broker The MessageBroker that the Flex client manager is associated with. + */ + public FlexClientManager(MessageBroker broker) + { + this(broker.isManaged(), broker); + } + + /** + * + */ + public FlexClientManager(boolean enableManagement, MessageBroker mbroker) + { + super(enableManagement); + + super.setId(TYPE); + + // Ensure that we have a message broker: + broker = (mbroker != null) ? mbroker : MessageBroker.getMessageBroker(null); + + FlexClientSettings flexClientSettings = broker.getFlexClientSettings(); + if (flexClientSettings != null && flexClientSettings.getTimeoutMinutes() != -1) + { + // Convert from minutes to millis. + setFlexClientTimeoutMillis(flexClientSettings.getTimeoutMinutes()*60*1000); + } + + this.setParent(broker); + } + + //-------------------------------------------------------------------------- + // + // Variables + // + //-------------------------------------------------------------------------- + + /** + * The MessageBroker that owns this manager. + */ + private final MessageBroker broker; + + /** + * The Mbean controller for this manager. + */ + private FlexClientManagerControl controller; + + /** + * Table to store FlexClients by id. + */ + private final Map<String,FlexClient> flexClients = new ConcurrentHashMap<String,FlexClient>(); + + + /** + * Manages time outs for FlexClients. + * This currently includes timeout of FlexClient instances, timeouts for async + * long-poll handling, and scheduling delayed flushes of outbound messages. + */ + private volatile TimeoutManager flexClientTimeoutManager; + + //-------------------------------------------------------------------------- + // + // Properties + // + //-------------------------------------------------------------------------- + + //---------------------------------- + // clientIds + //---------------------------------- + + /** + * Returns a string array of the client IDs. + * + * @return A string array of the client IDs. + */ + public String[] getClientIds() + { + String[] ids = new String[flexClients.size()]; + ArrayList<String> idList = new ArrayList<String>(flexClients.keySet()); + + for (int i = 0; i < flexClients.size(); i++) + ids[i] = idList.get(i); + + return ids; + } + + //---------------------------------- + // flexClientCount + //---------------------------------- + + /** + * Returns the number of FlexClients in use. + * + * @return The number of FlexClients in use. + */ + public int getFlexClientCount() + { + return flexClients.size(); + } + + //---------------------------------- + // flexClientTimeoutMillis + //---------------------------------- + + private volatile long flexClientTimeoutMillis; + + /** + * Returns the idle timeout in milliseconds to apply to new FlexClient instances. + * + * @return The idle timeout in milliseconds to apply to new FlexClient instances. + */ + public long getFlexClientTimeoutMillis() + { + return flexClientTimeoutMillis; + } + + /** + * Sets the idle timeout in milliseconds to apply to new FlexClient instances. + * + * @param value The idle timeout in milliseconds to apply to new FlexClient instances. + */ + public void setFlexClientTimeoutMillis(long value) + { + if (value < 1) + value = 0; + + synchronized (this) + { + flexClientTimeoutMillis = value; + } + } + + //---------------------------------- + // messageBroker + //---------------------------------- + + /** + * Returns the MessageBroker instance that owns this FlexClientManager. + * + * @return The parent MessageBroker instance. + */ + public MessageBroker getMessageBroker() + { + return broker; + } + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Get FlexClient with the specified id or a new one will be created. + * This method will return a valid existing FlexClient for the specific Id, + * or a new FlexClient will created + * @param id The id of the Flex client. + * @return FlexClient the FlexClient with the specified id + */ + public FlexClient getFlexClient(String id) + { + return getFlexClient(id, true); + } + + /** + * Get the FlexClient with the specified id. + * + * @param id The id of the Flex client. + * @param createNewIfNotExist if true, a new FlexClient will be created if not exist + * @return FlexClient the FlexClient with the specified id + */ + public FlexClient getFlexClient(String id, boolean createNewIfNotExist) + { + FlexClient flexClient = null; + // Try to lookup an existing instance if we receive an id. + if (id != null) + { + flexClient = flexClients.get(id); + if (flexClient != null) + { + if (flexClient.isValid() && !flexClient.invalidating) + { + flexClient.updateLastUse(); + return flexClient; + } + // Invalid, remove it - it will be replaced below. + flexClients.remove(id); + } + } + // Use a manager-level lock (this) when creating/recreating a new FlexClient. + synchronized (this) + { + if (id != null) + { + flexClient = flexClients.get(id); + if (flexClient != null) + { + flexClient.updateLastUse(); + return flexClient; + } + else + { + if (!createNewIfNotExist) + { + return null; + } + } + } + + flexClient = createFlexClient(id); + checkForNullAndDuplicateId(flexClient.getId()); + flexClients.put(flexClient.getId(), flexClient); + if (flexClientTimeoutMillis > 0) + flexClientTimeoutManager.scheduleTimeout(flexClient); + flexClient.notifyCreated(); + return flexClient; + } + } + + /** + * Creates a FlexClientOutboundQueueProcessor instance and hooks it up to the passed + * FlexClient. + * + * @param flexClient The FlexClient to equip with a queue processor. + * @param endpointId The Id of the endpoint the queue processor is used for. + * @return The FlexClient with a configured queue processor. + */ + public FlexClientOutboundQueueProcessor createOutboundQueueProcessor(FlexClient flexClient, String endpointId) + { + // First, try to create a custom outbound queue processor, if one exists. + FlexClientOutboundQueueProcessor processor = createCustomOutboundQueueProcessor(flexClient, endpointId); + + // If no custom processor, then try to create default queue processor. + if (processor == null) + processor = createDefaultOutboundQueueProcessor(flexClient, endpointId); + + // If MessageBroker's default queue processor fails, use the default processor. + if (processor == null) + { + processor = new FlexClientOutboundQueueProcessor(); + processor.setFlexClient(flexClient); + processor.setEndpointId(endpointId); + } + + return processor; + } + + /** + * + * Monitors an async poll for a FlexClient for timeout. + * + * @param asyncPollTimeout The async poll task to monitor for timeout. + */ + public void monitorAsyncPollTimeout(TimeoutAbstractObject asyncPollTimeout) + { + flexClientTimeoutManager.scheduleTimeout(asyncPollTimeout); + } + + /** + * + * Monitors a scheduled flush for a FlexClient for timeout. + * + * @param scheduledFlushTimeout The schedule flush task to monitor for timeout. + */ + public void monitorScheduledFlush(TimeoutAbstractObject scheduledFlushTimeout) + { + flexClientTimeoutManager.scheduleTimeout(scheduledFlushTimeout); + } + + /** + * Starts the Flex client manager. + * + * @see flex.management.ManageableComponent#start() + */ + @Override + public void start() + { + if (isManaged()) + { + controller = new FlexClientManagerControl(getParent().getControl(), this); + setControl(controller); + controller.register(); + } + + final String baseId = getId(); + flexClientTimeoutManager = new TimeoutManager(new ThreadFactory() + { + int counter = 1; + public synchronized Thread newThread(Runnable runnable) + { + Thread t = new Thread(runnable); + t.setName(baseId + "-FlexClientTimeoutThread-" + counter++); + return t; + } + }); + } + + /** + * @see flex.management.ManageableComponent#stop() + */ + public void stop() + { + if (controller != null) + { + controller.unregister(); + } + + if (flexClientTimeoutManager != null) + flexClientTimeoutManager.shutdown(); + } + + //-------------------------------------------------------------------------- + // + // Protected Methods + // + //-------------------------------------------------------------------------- + + /** + * Hook method invoked when a new <tt>FlexClient</tt> instance is created. + * + * @param id The id the client provided, which was previously assigned by this server, + * or another server in a cluster. New clients will pass a <code>null</code> + * value in which case this server must generate a unique id. + */ + protected FlexClient createFlexClient(String id) + { + return (id == null) ? new FlexClient(this) : new FlexClient(this, id); + } + + /* (non-Javadoc) + * @see flex.management.ManageableComponent#getLogCategory() + */ + protected String getLogCategory() + { + return LogCategories.CLIENT_FLEXCLIENT; + } + + /** + * + * Removes a FlexClient from being managed by this manager. + * This method is invoked by FlexClients when they are invalidated. + * + * @param flexClient The id of the FlexClient being invalidated. + */ + protected void removeFlexClient(FlexClient flexClient) + { + if (flexClient != null) + { + String id = flexClient.getId(); + synchronized (id) + { + FlexClient storedClient = flexClients.get(id); + // If the stored instance is the same as the invalidating instance based upon identity, + // remove it. + if (storedClient == flexClient) + flexClients.remove(id); + } + } + } + + //-------------------------------------------------------------------------- + // + // Private Methods + // + //-------------------------------------------------------------------------- + + private void checkForNullAndDuplicateId(String id) + { + if (id == null) + { + // Cannot create ''{0}'' with null id. + MessageException me = new MessageException(); + me.setMessage(10039, new Object[]{"FlexClient"}); + me.setCode("Server.Processing.NullId"); + throw me; + } + + if (flexClients.containsKey(id)) + { + // Cannot create ''{0}'' with id ''{1}''; another ''{0}'' is already registered with the same id. + MessageException me = new MessageException(); + me.setMessage(10040, new Object[]{"FlexClient", id}); + me.setCode("Server.Processing.DuplicateId"); + throw me; + } + } + + private FlexClientOutboundQueueProcessor createDefaultOutboundQueueProcessor( + FlexClient flexClient, String endpointId) + { + FlexClientSettings flexClientSettings = broker.getFlexClientSettings(); + if (flexClientSettings == null) + return null; + + String queueProcessorClassName = flexClientSettings.getFlexClientOutboundQueueProcessorClassName(); + if (queueProcessorClassName == null) + return null; + + FlexClientOutboundQueueProcessor processor = null; + try + { + Class queueProcessorClass = createClass(queueProcessorClassName); + Object instance = ClassUtil.createDefaultInstance(queueProcessorClass, null); + processor = (FlexClientOutboundQueueProcessor)instance; + processor.setFlexClient(flexClient); + processor.setEndpointId(endpointId); + processor.initialize(flexClientSettings.getFlexClientOutboundQueueProcessorProperties()); + } + catch (Throwable t) + { + String message = "Failed to create MessageBroker's outbound queue processor for FlexClient with id '" + flexClient.getId() + "'."; + if (Log.isWarn()) + Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn(message, t); + + MessageException me = new MessageException(message, t); + throw me; + } + + return processor; + } + + private FlexClientOutboundQueueProcessor createCustomOutboundQueueProcessor( + FlexClient flexClient, String endpointId) + { + FlexClientOutboundQueueProcessor processor = null; + Endpoint endpoint = broker.getEndpoint(endpointId); + if (endpoint instanceof AbstractEndpoint) + { + Class processorClass = ((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorClass(); + if (processorClass != null) + { + try + { + Object instance = ClassUtil.createDefaultInstance(processorClass, null); + if (instance instanceof FlexClientOutboundQueueProcessor) + { + processor = (FlexClientOutboundQueueProcessor)instance; + processor.setFlexClient(flexClient); + processor.setEndpointId(endpointId); + processor.initialize(((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorConfig()); + } + } + catch (Throwable t) + { + if (Log.isWarn()) + Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn("Failed to create custom outbound queue processor for FlexClient with id '" + flexClient.getId() + "'. Using MessageBroker's default queue processor.", t); + } + } + } + return processor; + } + + private Class createClass(String className) + { + Class c = ClassUtil.createClass(className, FlexContext.getMessageBroker() == null ? null : + FlexContext.getMessageBroker().getClassLoader()); + + return c; + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientNotSubscribedException.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientNotSubscribedException.java b/core/src/flex/messaging/client/FlexClientNotSubscribedException.java new file mode 100644 index 0000000..f5ffe18 --- /dev/null +++ b/core/src/flex/messaging/client/FlexClientNotSubscribedException.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import flex.messaging.MessageException; +import flex.messaging.log.LogEvent; + +/** + * + */ +public class FlexClientNotSubscribedException extends MessageException +{ + /** + * + */ + private static final long serialVersionUID = 773524927178340950L; + + //-------------------------------------------------------------------------- + // + // Properties + // + //-------------------------------------------------------------------------- + + //---------------------------------- + // defaultLogMessageIntro + //---------------------------------- + + /** + * Overrides the intro text for the log message. + */ + public String getDefaultLogMessageIntro() + { + return "FlexClient not subscribed: "; + } + + //---------------------------------- + // logStackTraceEnabled + //---------------------------------- + + /** + * Override to disable stack trace logging. + */ + public boolean isLogStackTraceEnabled() + { + return false; + } + + //---------------------------------- + // peferredLogLevel + //---------------------------------- + + /** + * Override to lower the preferred log level to debug. + */ + public short getPreferredLogLevel() + { + return LogEvent.DEBUG; + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java b/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java new file mode 100644 index 0000000..19cf19e --- /dev/null +++ b/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import flex.messaging.Destination; +import flex.messaging.MessageClient; +import flex.messaging.MessageDestination; +import flex.messaging.config.ConfigMap; +import flex.messaging.messages.Message; +import flex.messaging.services.messaging.ThrottleManager; +import flex.messaging.services.messaging.ThrottleManager.ThrottleResult; +import flex.messaging.services.messaging.ThrottleManager.ThrottleResult.Result; + +/** + * The base FlexClientOutboundQueueProcessor implementation used if a custom implementation is not + * specified. Its behavior is very simple. It adds all new messages in order to the tail + * of the outbound queue and flushes all queued messages to the network as quickly as possible. + * It also handles the outbound client-level throttling specified at the destination level. + */ +public class FlexClientOutboundQueueProcessor +{ + //-------------------------------------------------------------------------- + // + // Variables + // + //-------------------------------------------------------------------------- + + /** + * The associated FlexClient. + */ + private FlexClient client; + + /** + * The last MessageClient messages were flushed to. This is mainly for faster + * lookup. + */ + private MessageClient lastMessageClient; + + /** + * The associated endpoint's Id. + */ + private String endpointId; + + /** + * Manages throttling of outbound client level messages. + */ + protected OutboundQueueThrottleManager outboundQueueThrottleManager; + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * + * Stores the Id for the outbound queue's endpoint. + * + * @param value The Id for the outbound queue's endpoint. + */ + public void setEndpointId(String value) + { + endpointId = value; + } + + /** + * Returns the Id for the outbound queue's endpoint. + * + * @return The Id for the outbound queue's endpoint. + */ + public String getEndpointId() + { + return endpointId; + } + + /** + * + * Sets the associated FlexClient. + * + * @param value The associated FlexClient. + */ + public void setFlexClient(FlexClient value) + { + client = value; + } + + /** + * Returns the associated FlexClient. + * + * @return The associated FlexClient. + */ + public FlexClient getFlexClient() + { + return client; + } + + /** + * Returns the outbound queue throttle manager, or null if one does not exist. + * + * @return The outbound queue throttle manager. + */ + public OutboundQueueThrottleManager getOutboundQueueThrottleManager() + { + return outboundQueueThrottleManager; + } + + /** + * Utility method to initialize (if necessary) and return an outbound queue + * throttle manager. + * + * @return The outbound queue throttle manager. + */ + public OutboundQueueThrottleManager getOrCreateOutboundQueueThrottleManager() + { + if (outboundQueueThrottleManager == null) + outboundQueueThrottleManager = new OutboundQueueThrottleManager(this); + return outboundQueueThrottleManager; + } + + /** + * No-op; this default implementation doesn't require custom initialization. + * Subclasses may override to process any custom initialization properties that have been + * defined in the server configuration. + * + * @param properties A ConfigMap containing any custom initialization properties. + */ + public void initialize(ConfigMap properties) {} + + /** + * Always adds a new message to the tail of the queue. + * + * @param outboundQueue The queue of outbound messages. + * @param message The new message to add to the queue. + */ + public void add(List<Message> outboundQueue, Message message) + { + outboundQueue.add(message); + } + + /** + * Always empties the queue and returns all messages to be sent to the client. + * + * @param outboundQueue The queue of outbound messages. + * @return A FlushResult containing the messages that have been removed from the outbound queue + * to be written to the network and a wait time for the next flush of the outbound queue + * that is the default for the underlying Channel/Endpoint. + */ + public FlushResult flush(List<Message> outboundQueue) + { + return flush(null /* no client distinction */, outboundQueue); + } + + /** + * Removes all messages in the queue targeted to this specific MessageClient subscription(s) and + * returns them to be sent to the client. + * Overrides should be careful to only return messages for the specified MessageClient. + * + * @param messageClient The specific MessageClient to return messages for. + * @param outboundQueue The queue of outbound messages. + * @return A FlushResult containing the messages that have been removed from the outbound queue + * to be written to the network for this MessageClient. + */ + public FlushResult flush(MessageClient messageClient, List<Message> outboundQueue) + { + FlushResult flushResult = new FlushResult(); + List<Message> messagesToFlush = null; + + for (Iterator<Message> iter = outboundQueue.iterator(); iter.hasNext();) + { + Message message = iter.next(); + if (messageClient == null || (message.getClientId().equals(messageClient.getClientId()))) + { + if (isMessageExpired(message)) // Don't flush expired messages. + { + iter.remove(); + continue; + } + + messageClient = messageClient == null? getMessageClient(message) : messageClient; + + // First, apply the destination level outbound throttling. + ThrottleResult throttleResult = throttleOutgoingDestinationLevel(messageClient, message, false); + Result result = throttleResult.getResult(); + + // No destination level throttling; check destination-client level throttling. + if (Result.OK == result) + { + throttleResult = throttleOutgoingClientLevel(messageClient, message, false); + result = throttleResult.getResult(); + // If no throttling, simply add the message to the list. + if (Result.OK == result) + { + updateMessageFrequencyOutgoing(messageClient, message); + if (messagesToFlush == null) + messagesToFlush = new ArrayList<Message>(); + messagesToFlush.add(message); + } + // In rest of the policies (which is NONE), simply don't + // add the message to the list. + } + iter.remove(); + } + } + + flushResult.setMessages(messagesToFlush); + return flushResult; + } + + /** + * Utility method to test whether a message has expired or not. + * Messages with a timeToLive value that is shorter than the timespan from the message's + * timestamp up to the current system time will cause this method to return true. + * If there are expired messages in the outbound queue, flush implementations + * should use this helper method to only process and return messages that have + * not yet expired. + * + * @param message The message to test for expiration. + * + * @return true if the message has a timeToLive value that has expired; otherwise false. + */ + public boolean isMessageExpired(Message message) + { + return (message.getTimeToLive() > 0 && (System.currentTimeMillis() - message.getTimestamp()) >= message.getTimeToLive()); + } + + /** + * Attempts to throttle the outgoing message at the destination level. + * + * @param msgClient The client the message is intended for. + * @param message The message to consider to throttle. + * @param buffered Whether the message has already been buffered. In that case, + * parts of regular throttling code is skipped. + * @return The result of throttling attempt. + */ + protected ThrottleResult throttleOutgoingDestinationLevel(MessageClient msgClient, Message message, boolean buffered) + { + ThrottleManager throttleManager = getThrottleManager(msgClient); + if (throttleManager != null) + { + // In already buffered messages, don't use ThrottleManager#throttleOutgoingMessage + // to avoid regular throttling handling as the message has already been buffered. + if (buffered) + return throttleManager.throttleDestinationLevel(message, false /*incoming*/); + + // Otherwise, regular throttling. + return throttleManager.throttleOutgoingMessage(message); + } + return new ThrottleResult(); // Otherwise, return OK result. + } + + /** + * Attempts to throttle the outgoing message at the destination-client level. + * + * @param msgClient The client the message is intended for. + * @param message The message to consider to throttle. + * @param buffered Whether the message has already been buffered. In that case, + * parts of regular throttling code is skipped. + * @return The result of throttling attempt. + */ + protected ThrottleResult throttleOutgoingClientLevel(MessageClient msgClient, Message message, boolean buffered) + { + if (outboundQueueThrottleManager != null) // Means client level throttling enabled. + { + ThrottleResult throttleResult = outboundQueueThrottleManager.throttleOutgoingClientLevel(message); + if (!buffered) + { + ThrottleManager throttleManager = getThrottleManager(msgClient); + if (throttleManager != null) + throttleManager.handleOutgoingThrottleResult(message, throttleResult, true /*isClientLevel*/); + } + return throttleResult; + } + return new ThrottleResult(); // Otherwise, return OK result. + } + + /** + * Returns the message client that the message is intended to. + * + * @param message The message. + * @return The message client that the message is intended to. + */ + protected MessageClient getMessageClient(Message message) + { + // First try using the cached message client. + if (lastMessageClient != null && message.getClientId().equals(lastMessageClient.getClientId())) + { + return lastMessageClient; + } + else // Go ahead with the lookup. + { + lastMessageClient = client.getMessageClient((String)message.getClientId()); + return lastMessageClient; + } + } + + /** + * Returns the throttle manager associated with the destination the message + * is intended to. + * + * @param msgClient The message client; it can be null. + * @return The throttle manager. + */ + protected ThrottleManager getThrottleManager(MessageClient msgClient) + { + Destination destination = msgClient != null? msgClient.getDestination() : null; + return (destination != null && destination instanceof MessageDestination)? + ((MessageDestination)destination).getThrottleManager() : null; + } + + /** + * Updates the outgoing message's message frequency. + * + * @param msgClient The MessageClient that might have been passed to the flush; it can be null. + * @param message The message. + */ + protected void updateMessageFrequencyOutgoing(MessageClient msgClient, Message message) + { + // Update the destination level message frequency. + ThrottleManager throttleManager = getThrottleManager(msgClient); + if (throttleManager != null) + throttleManager.updateMessageFrequencyDestinationLevel(false /*incoming*/); + + // Update the client level message frequency. + if (outboundQueueThrottleManager != null) + outboundQueueThrottleManager.updateMessageFrequencyOutgoingClientLevel(message); + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlushResult.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlushResult.java b/core/src/flex/messaging/client/FlushResult.java new file mode 100644 index 0000000..ab400fa --- /dev/null +++ b/core/src/flex/messaging/client/FlushResult.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import java.util.List; + +import flex.messaging.messages.Message; + +/** + * Stores the messages that should be written to the network as a result of a flush + * invocation on a FlexClient's outbound queue. + */ +public class FlushResult +{ + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs a <tt>FlushResult</tt> instance to return from a + * flush invocation on a FlexClient's outbound queue. + * This instance stores the list of messages to write over the network to + * the client as well as an optional wait time in milliseconds for when the + * next flush should be invoked. + */ + public FlushResult() {} + + //-------------------------------------------------------------------------- + // + // Properties + // + //-------------------------------------------------------------------------- + + //---------------------------------- + // messages + //---------------------------------- + + private List<Message> messages; + + /** + * Returns the messages to write to the network for this flush invocation. + * This list may be null, in which case no messages are written. + * + * @return The messages to write to the network for this flush invocation. + */ + public List<Message> getMessages() + { + return messages; + } + + /** + * Sets the messages to write to the network for this flush invocation. + * + * @param value The messages to write to the network for this flush invocation. + */ + public void setMessages(List<Message> value) + { + messages = value; + } + + //---------------------------------- + // nextFlushWaitTimeMillis + //---------------------------------- + + private int nextFlushWaitTimeMillis = 0; + + /** + * Returns the wait time in milliseconds for when the next flush invocation should occur. + * If this value is 0, the default, a delayed flush is not scheduled and the next flush will + * depend upon the underlying Channel/Endpoint. + * For client-side polling Channels the next flush invocation will happen when the client sends + * its next poll request at its regular interval. + * For client-side Channels that support direct writes to the client a flush invocation is triggered + * when the next message is added to the outbound queue. + * + * @return The wait time in milliseconds before flush is next invoked. A value of 0, the default, + * indicates that the default flush behavior for the underlying Channel/Endpoint should be + * used. + */ + public int getNextFlushWaitTimeMillis() + { + return nextFlushWaitTimeMillis; + } + + /** + * Sets the wait time in milliseconds for when the next flush invocation should occur. + * If this value is 0, the default, a delayed flush is not scheduled and the next flush will + * depend upon the underlying Channel/Endpoint. + * For client-side polling Channels the next flush invocation will happen when the client sends + * its next poll request at its regular interval. + * For client-side Channels that support direct writes to the client a flush invocation is triggered + * when the next message is added to the outbound queue. + * Negative value assignments are treated as 0. + * + * @param value The wait time in milliseconds before flush will be invoked. + */ + public void setNextFlushWaitTimeMillis(int value) + { + nextFlushWaitTimeMillis = (value < 1) ? 0 : value; + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/OutboundQueueThrottleManager.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/OutboundQueueThrottleManager.java b/core/src/flex/messaging/client/OutboundQueueThrottleManager.java new file mode 100644 index 0000000..24a66d0 --- /dev/null +++ b/core/src/flex/messaging/client/OutboundQueueThrottleManager.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import java.util.concurrent.ConcurrentHashMap; + +import flex.messaging.MessageClient.SubscriptionInfo; +import flex.messaging.config.ThrottleSettings.Policy; +import flex.messaging.log.Log; +import flex.messaging.messages.Message; +import flex.messaging.services.messaging.MessageFrequency; +import flex.messaging.services.messaging.ThrottleManager; +import flex.messaging.services.messaging.ThrottleManager.ThrottleResult; +import flex.messaging.util.StringUtils; + + +/** + * Used to keep track of and limit outbound message rates of a single FlexClient queue. + * An outbound FlexClient queue can contain messages from multiple MessageClients + * across multiple destinations. It can also contain messages for multiple + * subscriptions (for each subtopic/selector) across the same destination for + * the same MessageClient. + */ +public class OutboundQueueThrottleManager +{ + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs a default outbound queue throttle manager. + * + * @param processor The outbound queue processor that is using this throttle manager. + */ + public OutboundQueueThrottleManager(FlexClientOutboundQueueProcessor processor) + { + destinationFrequencies = new ConcurrentHashMap<String, DestinationFrequency>(); + this.processor = processor; + } + + //-------------------------------------------------------------------------- + // + // Variables + // + //-------------------------------------------------------------------------- + + /** + * Map of destination id and destination message frequencies. + */ + protected final ConcurrentHashMap<String, DestinationFrequency> destinationFrequencies; + + /** + * The parent queue processor of the throttle manager. + */ + protected final FlexClientOutboundQueueProcessor processor; + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Registers the destination with the outbound throttle manager. + * + * @param destinationId The id of the destination. + * @param outboundMaxClientFrequency The outbound max-client-frequency specified + * at the destination. + * @param outboundPolicy The outbound throttle policy specified at the destination. + */ + public void registerDestination(String destinationId, int outboundMaxClientFrequency, Policy outboundPolicy) + { + DestinationFrequency frequency = destinationFrequencies.get(destinationId); + if (frequency == null) + { + frequency = new DestinationFrequency(outboundMaxClientFrequency, outboundPolicy); + destinationFrequencies.putIfAbsent(destinationId, frequency); + } + } + + /** + * Registers the subscription of a client talking to a destination with the + * specified subscription info. + * + * @param destinationId The destination id. + * @param si The subscription information. + */ + public void registerSubscription(String destinationId, SubscriptionInfo si) + { + DestinationFrequency frequency = destinationFrequencies.get(destinationId); + frequency.logMaxFrequencyDuringRegistration(frequency.outboundMaxClientFrequency, si); + } + + /** + * Unregisters the subscription. + * + * @param destinationId The destination id. + * @param si The subscription information. + */ + public void unregisterSubscription(String destinationId, SubscriptionInfo si) + { + unregisterDestination(destinationId); + } + + /** + * Unregisters all subscriptions of the client under the specified destination. + * + * @param destinationId The destination id. + */ + public void unregisterAllSubscriptions(String destinationId) + { + unregisterDestination(destinationId); + } + + /** + * Attempts to throttle the outgoing message. + * + * @param message The message to consider to throttle. + * @return True if the message was throttled; otherwise false. + */ + public ThrottleResult throttleOutgoingClientLevel(Message message) + { + String destinationId = message.getDestination(); + if (isDestinationRegistered(destinationId)) + { + DestinationFrequency frequency = destinationFrequencies.get(message.getDestination()); + int maxFrequency = frequency.getMaxFrequency(message); // Limit to check against. + MessageFrequency messageFrequency = frequency.getMessageFrequency(message); // Message rate of the client. + if (messageFrequency != null) + { + ThrottleResult result = messageFrequency.checkLimit(maxFrequency, frequency.outboundPolicy); + return result; + } + } + return new ThrottleResult(); // Otherwise, return OK result. + } + + /** + * Updates the outgoing client level message frequency of the message. + * + * @param message The message. + */ + public void updateMessageFrequencyOutgoingClientLevel(Message message) + { + String destinationId = message.getDestination(); + if (isDestinationRegistered(destinationId)) + { + DestinationFrequency frequency = destinationFrequencies.get(message.getDestination()); + MessageFrequency messageFrequency = frequency.getMessageFrequency(message); + if (messageFrequency != null) + messageFrequency.updateMessageFrequency(); + } + } + + //-------------------------------------------------------------------------- + // + // Protected Methods + // + //-------------------------------------------------------------------------- + + /** + * Determines whether the destination has been registered or not. + * + * @param destinationId The destination id. + * @return True if the destination with the specified id has been registered. + */ + protected boolean isDestinationRegistered(String destinationId) + { + return destinationFrequencies.containsKey(destinationId); + } + + /** + * Unregisters the destination from the outbound throttle manager. + * + * @param destinationId The id of the destination. + */ + protected void unregisterDestination(String destinationId) + { + if (isDestinationRegistered(destinationId)) + destinationFrequencies.remove(destinationId); + } + + //-------------------------------------------------------------------------- + // + // Inner Classes + // + //-------------------------------------------------------------------------- + + /** + * Used to keep track of max-client-frequency and outgoing throttle policy + * specified at the destination. It also keeps track of outbound message + * rates of all MessageClient subscriptions across the destination. + */ + class DestinationFrequency + { + protected final int outboundMaxClientFrequency; // destination specified client limit. + protected final MessageFrequency outboundClientFrequency; + protected final Policy outboundPolicy; // destination specified policy. + + /** + * Default constructor. + * + * @param outboundMaxClientFrequency The outbound throttling max-client-frequency of the destination. + * @param outboundPolicy The outbound throttling policy of the destination. + */ + DestinationFrequency(int outboundMaxClientFrequency, Policy outboundPolicy) + { + this.outboundMaxClientFrequency = outboundMaxClientFrequency; + this.outboundPolicy = outboundPolicy; + outboundClientFrequency = new MessageFrequency(outboundMaxClientFrequency); + } + + /** + * Returns the max-client-frequency for the subscription the message is + * intended for (which is simply the max-client-frequency specified at + * the destination). + * + * @param message The message. + * + * @return The max-frequency for the subscription. + */ + int getMaxFrequency(Message message) + { + return outboundMaxClientFrequency; + } + + /** + * Given a subscription the message is intended to, returns the message + * rate frequency for that subscription. + * + * @param message The message. + * @return The message frequency for the subscription, if it exists; otherwise null. + */ + MessageFrequency getMessageFrequency(Message message) + { + return outboundClientFrequency; + } + + /** + * Utility function to log the maxFrequency being used during subscription. + * + * @param maxFrequency The maxFrequency to log. + */ + void logMaxFrequencyDuringRegistration(int maxFrequency, SubscriptionInfo si) + { + if (Log.isDebug()) + Log.getLogger(ThrottleManager.LOG_CATEGORY).debug("Outbound queue throttle manager for FlexClient '" + + processor.getFlexClient().getId() + "' is using '" + maxFrequency + + "' as the throttling limit for its subscription: " + + StringUtils.NEWLINE + si); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/PollFlushResult.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/PollFlushResult.java b/core/src/flex/messaging/client/PollFlushResult.java new file mode 100644 index 0000000..dc8e33b --- /dev/null +++ b/core/src/flex/messaging/client/PollFlushResult.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +/** + * Extends <tt>FlushResult</tt> and adds additional properties for controlling + * client polling behavior. + */ +public class PollFlushResult extends FlushResult +{ + //-------------------------------------------------------------------------- + // + // Properties + // + //-------------------------------------------------------------------------- + + //---------------------------------- + // avoidBusyPolling + //---------------------------------- + + private boolean avoidBusyPolling; + + /** + * Indicates whether the handling of this result should attempt to avoid + * potential busy-polling cycles. + * This will be set to <code>true</code> in the case of two clients that are both + * long-polling the server over the same session. + * + * @return <code>true</code> if the handling of this result should attempt to avoid potential + * busy-polling cycles. + */ + public boolean isAvoidBusyPolling() + { + return avoidBusyPolling; + } + + /** + * Set to <code>true</code> to signal that handling for this result should attempt to avoid + * potential busy-polling cycles. + * + * @param value <code>true</code> to signal that handling for this result should attempt to + * avoid potential busy-polling cycles. + */ + public void setAvoidBusyPolling(boolean value) + { + avoidBusyPolling = value; + } + + //---------------------------------- + // clientProcessingSuppressed + //---------------------------------- + + private boolean clientProcessingSuppressed; + + /** + * Indicates whether client processing of this result should be + * suppressed. + * This should be <code>true</code> for results generated for poll requests + * that arrive while a long-poll request from the same client is being serviced + * to avoid a busy polling cycle. + * + * @return <code>true</code> if client processing of this result should be suppressed; + * otherwise <code>false</code>. + */ + public boolean isClientProcessingSuppressed() + { + return clientProcessingSuppressed; + } + + /** + * Set to <code>true</code> to suppress client processing of this result. + * Default is <code>false</code>. + * This should be set to <code>true</code> for results generated for poll requests + * that arrive while a long-poll request from the same client is being serviced + * to avoid a busy polling cycle. + * + * @param value <code>true</code> to suppress client processing of the result. + */ + public void setClientProcessingSuppressed(boolean value) + { + clientProcessingSuppressed = value; + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/PollWaitListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/PollWaitListener.java b/core/src/flex/messaging/client/PollWaitListener.java new file mode 100644 index 0000000..7978e6f --- /dev/null +++ b/core/src/flex/messaging/client/PollWaitListener.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +/** + * Used alongside invocations of <code>FlexClient.pollWithWait()</code> to allow calling code to + * maintain a record of the Objects being used to place waited poll requests into a wait + * state. This can be used to break the threads out of their wait state separately from the + * internal waited poll handling within <code>FlexClient</code>. + */ +public interface PollWaitListener +{ + /** + * Hook method invoked directly before a wait begins. + * + * @param notifier The <tt>Object</tt> being used to <code>wait()/notify()</code>. + */ + void waitStart(Object notifier); + + /** + * Hook method invoked directly after a wait completes. + * + * @param notifier The <tt>Object</tt> being used to <code>wait()/notify()</code>. + */ + void waitEnd(Object notifier); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/UserAgentSettings.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/UserAgentSettings.java b/core/src/flex/messaging/client/UserAgentSettings.java new file mode 100644 index 0000000..217f2c7 --- /dev/null +++ b/core/src/flex/messaging/client/UserAgentSettings.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +/** + * A class to hold user agent specific properties. For example, in streaming + * endpoints, a certain number of bytes need to be written before the + * streaming connection can be used and this value is specific to user agents. + * Similarly, the number of simultaneous connections a session can have is user + * agent specific. + */ +public class UserAgentSettings +{ + /** + * The prefixes of the version token used by various browsers. + */ + public static final String USER_AGENT_ANDROID = "Android"; + public static final String USER_AGENT_CHROME = "Chrome"; + public static final String USER_AGENT_FIREFOX = "Firefox"; + public static final String USER_AGENT_FIREFOX_1 = "Firefox/1"; + public static final String USER_AGENT_FIREFOX_2 = "Firefox/2"; + public static final String USER_AGENT_MSIE = "MSIE"; + public static final String USER_AGENT_MSIE_5 = "MSIE 5"; + public static final String USER_AGENT_MSIE_6 = "MSIE 6"; + public static final String USER_AGENT_MSIE_7 = "MSIE 7"; + public static final String USER_AGENT_OPERA = "Opera"; + public static final String USER_AGENT_OPERA_8 = "Opera 8"; + // Opera 10,11 ship as User Agent Opera/9.8. + public static final String USER_AGENT_OPERA_10 = "Opera/9.8"; + public static final String USER_AGENT_SAFARI = "Safari"; + + /** + * Bytes needed to kickstart the streaming connections for IE. + */ + public static final int KICKSTART_BYTES_MSIE = 2048; + /** + * Bytes needed to kickstart the streaming connections for SAFARI. + */ + public static final int KICKSTART_BYTES_SAFARI = 512; + /** + * Bytes needs to kicksart the streaming connections for Android. + */ + public static final int KICKSTART_BYTES_ANDROID = 4010; + + /** + * The default number of persistent connections per session for various browsers. + */ + private static final int MAX_PERSISTENT_CONNECTIONS_LEGACY = 1; + public static final int MAX_PERSISTENT_CONNECTIONS_DEFAULT = 5; + private static final int MAX_PERSISTENT_CONNECTIONS_OPERA_LEGACY = 3; + private static final int MAX_PERSISTENT_CONNECTIONS_CHROME = MAX_PERSISTENT_CONNECTIONS_DEFAULT; + private static final int MAX_PERSISTENT_CONNECTIONS_FIREFOX = MAX_PERSISTENT_CONNECTIONS_DEFAULT; + private static final int MAX_PERSISTENT_CONNECTIONS_MSIE = MAX_PERSISTENT_CONNECTIONS_DEFAULT; + private static final int MAX_PERSISTENT_CONNECTIONS_OPERA = 7; + private static final int MAX_PERSISTENT_CONNECTIONS_SAFARI = 3; + + private String matchOn; + private int kickstartBytes; + private int maxPersistentConnectionsPerSession = MAX_PERSISTENT_CONNECTIONS_DEFAULT; + + /** + * Static method to retrieve pre-initialized user agents which are as follows: + * + * In Chrome 0, 1, 2, the limit is 6: + * match-on="Chrome" max-persistent-connections-per-session="5" + * + * In Firefox 1, 2, the limit is 2: + * match-on="Firefox" max-persistent-connections-per-session="1" + * + * In Firefox 3, the limit is 6: + * match-on="Firefox/3" max-persistent-connections-per-session="5" + * + * In MSIE 5, 6, 7, the limit is 2 with kickstart bytes of 2K: + * match-on="MSIE" max-persistent-connections-per-session="1" kickstart-bytes="2048" + * + * In MSIE 8, the limit is 6 with kickstart bytes of 2K: + * match-on="MSIE 8" max-persistent-connections-per-session="5" kickstart-bytes="2048" + * + * In Opera 7, 9, the limit is 4: + * match-on="Opera" max-persistent-connections-per-session="3" + * + * In Opera 8, the limit is 8: + * match-on="Opera 8" max-persistent-connections-per-session="7" + * + * In Opera 10, the limit is 8. + * match-on="Opera 10" max-persistent-connections-per-session="7" + * + * In Safari 3, 4, the limit is 4. + * match-on="Safari" max-persistent-connections-per-session="3" + * + * @param matchOn String to use match the agent. + */ + public static UserAgentSettings getAgent(String matchOn) + { + UserAgentSettings userAgent = new UserAgentSettings(); + userAgent.setMatchOn(matchOn); + + if (USER_AGENT_ANDROID.equals(matchOn)) + { + userAgent.setKickstartBytes(KICKSTART_BYTES_ANDROID); + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_SAFARI); + } + if (USER_AGENT_CHROME.equals(matchOn)) + { + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_CHROME); + } + else if (USER_AGENT_FIREFOX.equals(matchOn)) + { + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_FIREFOX); + } + else if (USER_AGENT_FIREFOX_1.equals(matchOn)) + { + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); + } + else if (USER_AGENT_FIREFOX_2.equals(matchOn)) + { + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); + } + else if (USER_AGENT_MSIE.equals(matchOn)) + { + userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE); + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_MSIE); + } + else if (USER_AGENT_MSIE_5.equals(matchOn)) + { + userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE); + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); + } + else if (USER_AGENT_MSIE_6.equals(matchOn)) + { + userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE); + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); + } + else if (USER_AGENT_MSIE_7.equals(matchOn)) + { + userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE); + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY); + } + else if (USER_AGENT_OPERA.equals(matchOn)) + { + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA_LEGACY); + } + else if (USER_AGENT_OPERA_8.equals(matchOn)) + { + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA); + } + else if (USER_AGENT_OPERA_10.equals(matchOn)) + { + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA); + } + else if (USER_AGENT_SAFARI.equals(matchOn)) + { + userAgent.setKickstartBytes(KICKSTART_BYTES_SAFARI); + userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_SAFARI); + } + return userAgent; + } + + /** + * Returns the String to use to match the agent. + * + * @return The String to use to match the agent. + */ + public String getMatchOn() + { + return matchOn; + } + + /** + * Sets the String to use to match the agent. + * + * @param matchOn The String to use to match the agent. + */ + public void setMatchOn(String matchOn) + { + this.matchOn = matchOn; + } + + /** + * Returns the number of bytes needed to kickstart the streaming connections + * for the user agent. + * + * @return The number of bytes needed to kickstart the streaming connections + * for the user agent. + */ + public int getKickstartBytes() + { + return kickstartBytes; + } + + /** + * Sets the number of bytes needed to kickstart the streaming connections + * for the user agent. + * + * @param kickstartBytes The number of bytes needed to kickstart the streaming + * connections for the user agent. + */ + public void setKickstartBytes(int kickstartBytes) + { + if (kickstartBytes < 0) + kickstartBytes = 0; + this.kickstartBytes = kickstartBytes; + } + + /** + * @deprecated Use {@link UserAgentSettings#getMaxPersistentConnectionsPerSession()} instead. + */ + public int getMaxStreamingConnectionsPerSession() + { + return getMaxPersistentConnectionsPerSession(); + } + + /** + * @deprecated Use {@link UserAgentSettings#setMaxPersistentConnectionsPerSession(int)} instead. + */ + public void setMaxStreamingConnectionsPerSession(int maxStreamingConnectionsPerSession) + { + setMaxPersistentConnectionsPerSession(maxStreamingConnectionsPerSession); + } + + /** + * Returns the number of simultaneous streaming connections per session + * the user agent supports. + * + * @return The number of streaming connections per session the user agent supports. + */ + public int getMaxPersistentConnectionsPerSession() + { + return maxPersistentConnectionsPerSession; + } + + /** + * Sets the number of simultaneous streaming connections per session + * the user agent supports. + * + * @param maxStreamingConnectionsPerSession The number of simultaneous + * streaming connections per session the user agent supports. + */ + public void setMaxPersistentConnectionsPerSession(int maxStreamingConnectionsPerSession) + { + this.maxPersistentConnectionsPerSession = maxStreamingConnectionsPerSession; + } + +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/package-info.java b/core/src/flex/messaging/client/package-info.java new file mode 100644 index 0000000..97d5848 --- /dev/null +++ b/core/src/flex/messaging/client/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package flex.messaging.client; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/cluster/BroadcastHandler.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/cluster/BroadcastHandler.java b/core/src/flex/messaging/cluster/BroadcastHandler.java new file mode 100644 index 0000000..f036622 --- /dev/null +++ b/core/src/flex/messaging/cluster/BroadcastHandler.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.cluster; + +import java.util.List; + +/** + * + * This interface represents a handler for a message broadcast by a Cluster. + * Clusters broadcast messages across their physical nodes, and when they + * receive those messages they locate a BroadcastHandler capable of handling + * the broadcast. + */ +public interface BroadcastHandler +{ + /** + * Handle the broadcast message. + * + * @param sender sender of the original message + * @param params any parameters need to handle the message + */ + void handleBroadcast(Object sender, List<Object> params); + + /** + * Determine whether this Handler supports a particular operation by name. + * + * @return whether or not this handler supports the named operation + * @param name name of the operation + */ + boolean isSupportedOperation(String name); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/cluster/Cluster.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/cluster/Cluster.java b/core/src/flex/messaging/cluster/Cluster.java new file mode 100644 index 0000000..0339c55 --- /dev/null +++ b/core/src/flex/messaging/cluster/Cluster.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.cluster; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.w3c.dom.Element; + +import flex.messaging.config.ConfigMap; +import flex.messaging.log.LogCategories; + +/** + * + * Base interface for cluster implementations. + */ +public abstract class Cluster +{ + /** + * Default log category for clustering. + */ + public static final String LOG_CATEGORY = LogCategories.SERVICE_CLUSTER; + + /** + * Listeners to be notified when a node is removed from the cluster. + */ + List removeNodeListeners = Collections.synchronizedList(new ArrayList()); + + /** + * Cluster properties file. + */ + Element clusterPropertiesFile; + + /** + * Specifies whether or not this is the default cluster. + */ + boolean def; + + /** + * Specifies if this cluster is enabled for URL load balancing. + */ + boolean urlLoadBalancing; + + /** + * Because destinations are the constructs which become clustered, clusters + * are identified by a unique name composed in the format + * "serviceType:destinationId". + * + * @return The unique name for the clustered destination. + * @param serviceType The name of the service for this destination. + * @param destinationName The original name of the destination. + */ + static String getClusterDestinationKey(String serviceType, String destinationName) + { + StringBuffer sb = new StringBuffer(); + sb.append(serviceType); + sb.append(':'); + sb.append(destinationName); + return sb.toString(); + } + + /** + * Add a listener for remove cluster node notification. + * + * @param listener the RemoveNodeListener to add + */ + public void addRemoveNodeListener(RemoveNodeListener listener) + { + removeNodeListeners.add(listener); + } + + /** + * Send notification to remove node listeners that a node has + * been removed from the cluster. + * + * @param address The node that was removed from the cluster. + */ + protected void sendRemoveNodeListener(Object address) + { + synchronized (removeNodeListeners) + { + for (int i = 0; i < removeNodeListeners.size(); i++) + ((RemoveNodeListener)removeNodeListeners.get(i)).removeClusterNode(address); + } + } + + /** + * Initializes the Cluster with id and the map of properties. The default + * implementation is no-op. + * + * @param id The cluster id. + * @param properties The map of properties. + */ + public void initialize(String id, ConfigMap properties) + { + // No-op. + } + + /** + * Returns the cluster properties file. + * + * @return The cluster properties file. + */ + public Element clusterPropertiesFile() + { + return clusterPropertiesFile; + } + + /** + * Sets the cluster properties file. + * + * @param value The cluster properties file. + */ + public void setClusterPropertiesFile(Element value) + { + this.clusterPropertiesFile = value; + } + + /** + * Returns true if this is the default cluster for any destination that does not + * specify a clustered destination. + * + * @return Returns true if this is the default cluster. + */ + public boolean isDefault() + { + return def; + } + + /** + * When true, this is the default cluster for any destination that does not + * specify a clustered destination. + * + * @param d true if this is the default cluster + */ + public void setDefault(boolean d) + { + this.def = d; + } + + /** + * When true, this cluster is enabled for URL load balancing. + * + * @return true if this cluster enabled for load balancing. + */ + public boolean getURLLoadBalancing() + { + return urlLoadBalancing; + } + + /** + * When true, the cluster is enabled for URL load balancing. + * + * @param u the flag to enable the URL load balancing + */ + public void setURLLoadBalancing(boolean u) + { + urlLoadBalancing = u; + } + + /** + * Shutdown the cluster. + */ + public abstract void destroy(); + + /** + * Retrieve a List of Maps, where each Map contains channel id keys + * mapped to endpoint URLs for the given service type and destination name. + * There is exactly one endpoint URL for each + * channel id. This List represents all of the known endpoint URLs + * for all of the channels in the Cluster. + * @param serviceType the service type + * @param destName the destination name + * @return List of maps of channel ids to endpoint URLs for each node in + * the cluster. + */ + public abstract List getAllEndpoints(String serviceType, String destName); + + /** + * Returns a list of all of the nodes of this cluster. + * @return List a list of member IP addresses in the cluster + */ + public abstract List getMemberAddresses(); + + /** + * Returns the local cluster node. + * @return Object the Local Address object + */ + public abstract Object getLocalAddress(); + + /** + * Broadcast a service-related operation, which usually includes a Message as a method parameter. This method + * allows a local service to process a Message and then send the Message to the services on all peer nodes + * so that they may perform the same processing. + * + * @param serviceOperation The operation to broadcast. + * @param params Parameters for the operation. + */ + public abstract void broadcastServiceOperation(String serviceOperation, Object[] params); + + /** + * Send a service-related operation in point-to-point fashion to one and only one member of the cluster. + * This is similar to the broadcastServiceOperation except that this invocation is sent to the first + * node among the cluster members that does not have the local node's address. + * + * @param serviceOperation The operation to send. + * @param params Parameters for the operation. + * @param targetAddress the target address of a remote node in the cluster + */ + public abstract void sendPointToPointServiceOperation(String serviceOperation, Object[] params, Object targetAddress); + + /** + * Add a local endpoint URL for a local channel. After doing so, broadcast the information to + * peers so that they will be aware of the URL. + * + * @param serviceType the service type of the endpoint + * @param destName the destination name + * @param channelId the Channel ID + * @param endpointUrl the endpoint URL + * @param endpointPort the endpoint port + */ + public abstract void addLocalEndpointForChannel(String serviceType, String destName, + String channelId, String endpointUrl, int endpointPort); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/cluster/ClusterException.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/cluster/ClusterException.java b/core/src/flex/messaging/cluster/ClusterException.java new file mode 100644 index 0000000..c0dc41d --- /dev/null +++ b/core/src/flex/messaging/cluster/ClusterException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.cluster; + +import flex.messaging.MessageException; + +/** + * + * Exception type for cluster errors. + */ +public class ClusterException extends MessageException +{ + /** + * Serializable version uid. + */ + static final long serialVersionUID = 1948590697997522770L; +}
