http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/Server.java ---------------------------------------------------------------------- diff --git a/modules/core/src/flex/messaging/Server.java b/modules/core/src/flex/messaging/Server.java old mode 100755 new mode 100644 index 3022fe6..dd6bf25 --- a/modules/core/src/flex/messaging/Server.java +++ b/modules/core/src/flex/messaging/Server.java @@ -1,35 +1,35 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -/** - * The interface for a shared server instance that may be associated with a - * <tt>MessageBroker</tt> and used by endpoints. - */ -public interface Server extends FlexComponent -{ - /** - * Returns the id for the server. - * Endpoints can lookup server instances that have been associated with a <tt>MessageBroker</tt> using {@link MessageBroker#getServer(String)}. - */ - String getId(); - - /** - * Sets the id for the server. - */ - void setId(String value); -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +/** + * The interface for a shared server instance that may be associated with a + * <tt>MessageBroker</tt> and used by endpoints. + */ +public interface Server extends FlexComponent +{ + /** + * Returns the id for the server. + * Endpoints can lookup server instances that have been associated with a <tt>MessageBroker</tt> using {@link MessageBroker#getServer(String)}. + */ + String getId(); + + /** + * Sets the id for the server. + */ + void setId(String value); +}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/ServiceValidationListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/flex/messaging/ServiceValidationListener.java b/modules/core/src/flex/messaging/ServiceValidationListener.java old mode 100755 new mode 100644 index 4cecbd7..cb08fe1 --- a/modules/core/src/flex/messaging/ServiceValidationListener.java +++ b/modules/core/src/flex/messaging/ServiceValidationListener.java @@ -1,30 +1,30 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -/** - * @exclude - */ -public interface ServiceValidationListener -{ - /** - * This method gets called before any other processing of the describeServices method. - * It allows a hook for external systems that need to update the service destinations at runtime. - */ - void validateServices(); - void validateDestination(String destination); +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +/** + * @exclude + */ +public interface ServiceValidationListener +{ + /** + * This method gets called before any other processing of the describeServices method. + * It allows a hook for external systems that need to update the service destinations at runtime. + */ + void validateServices(); + void validateDestination(String destination); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/VersionInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/flex/messaging/VersionInfo.java b/modules/core/src/flex/messaging/VersionInfo.java old mode 100755 new mode 100644 index 4ff2cc6..8275f60 --- a/modules/core/src/flex/messaging/VersionInfo.java +++ b/modules/core/src/flex/messaging/VersionInfo.java @@ -1,116 +1,116 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging; - -import flex.messaging.util.StringUtils; - - -/** - * Class representing the build version of Data Services. - * - *@exclude - */ -public class VersionInfo -{ - //Cache this info as it should not change during the time class is loaded - public static String BUILD_MESSAGE; - public static String BUILD_NUMBER_STRING; - public static String BUILD_TITLE; - public static long BUILD_NUMBER; - - private static final String LCDS_CLASS = "flex.data.DataService"; - - public static String buildMessage() - { - if (BUILD_MESSAGE == null) - { - try - { - //Ensure we've parsed build info - getBuild(); - - if (StringUtils.isEmpty(BUILD_NUMBER_STRING)) - { - BUILD_MESSAGE = BUILD_TITLE; - } - else - { - BUILD_MESSAGE = BUILD_TITLE + ": " + BUILD_NUMBER_STRING; - } - } - catch (Throwable t) - { - BUILD_MESSAGE = BUILD_TITLE +": information unavailable"; - } - } - - return BUILD_MESSAGE; - } - - public static long getBuildAsLong() - { - if (BUILD_NUMBER == 0) - { - getBuild(); - - if (BUILD_NUMBER_STRING != null && !BUILD_NUMBER_STRING.equals("")) - { - try - { - BUILD_NUMBER = Long.parseLong(BUILD_NUMBER_STRING); - } - catch (NumberFormatException nfe) - { - // ignore, just return 0 - } - } - } - - return BUILD_NUMBER; - } - - public static String getBuild() - { - if (BUILD_NUMBER_STRING == null) - { - Class classToUseForManifest; - - try - { - classToUseForManifest = Class.forName(LCDS_CLASS); - } - catch (ClassNotFoundException e) - { - classToUseForManifest = VersionInfo.class; - } - - try - { - BUILD_NUMBER_STRING = ""; - Package pack = classToUseForManifest.getPackage(); - BUILD_NUMBER_STRING = pack.getImplementationVersion(); - BUILD_TITLE = pack.getImplementationTitle(); - } - catch (Throwable t) - { - // ignore, just return empty string - } - } - - return BUILD_NUMBER_STRING; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import flex.messaging.util.StringUtils; + + +/** + * Class representing the build version of Data Services. + * + *@exclude + */ +public class VersionInfo +{ + //Cache this info as it should not change during the time class is loaded + public static String BUILD_MESSAGE; + public static String BUILD_NUMBER_STRING; + public static String BUILD_TITLE; + public static long BUILD_NUMBER; + + private static final String LCDS_CLASS = "flex.data.DataService"; + + public static String buildMessage() + { + if (BUILD_MESSAGE == null) + { + try + { + //Ensure we've parsed build info + getBuild(); + + if (StringUtils.isEmpty(BUILD_NUMBER_STRING)) + { + BUILD_MESSAGE = BUILD_TITLE; + } + else + { + BUILD_MESSAGE = BUILD_TITLE + ": " + BUILD_NUMBER_STRING; + } + } + catch (Throwable t) + { + BUILD_MESSAGE = BUILD_TITLE +": information unavailable"; + } + } + + return BUILD_MESSAGE; + } + + public static long getBuildAsLong() + { + if (BUILD_NUMBER == 0) + { + getBuild(); + + if (BUILD_NUMBER_STRING != null && !BUILD_NUMBER_STRING.equals("")) + { + try + { + BUILD_NUMBER = Long.parseLong(BUILD_NUMBER_STRING); + } + catch (NumberFormatException nfe) + { + // ignore, just return 0 + } + } + } + + return BUILD_NUMBER; + } + + public static String getBuild() + { + if (BUILD_NUMBER_STRING == null) + { + Class classToUseForManifest; + + try + { + classToUseForManifest = Class.forName(LCDS_CLASS); + } + catch (ClassNotFoundException e) + { + classToUseForManifest = VersionInfo.class; + } + + try + { + BUILD_NUMBER_STRING = ""; + Package pack = classToUseForManifest.getPackage(); + BUILD_NUMBER_STRING = pack.getImplementationVersion(); + BUILD_TITLE = pack.getImplementationTitle(); + } + catch (Throwable t) + { + // ignore, just return empty string + } + } + + return BUILD_NUMBER_STRING; + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/client/AsyncPollHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/flex/messaging/client/AsyncPollHandler.java b/modules/core/src/flex/messaging/client/AsyncPollHandler.java old mode 100755 new mode 100644 index 64573e3..2817419 --- a/modules/core/src/flex/messaging/client/AsyncPollHandler.java +++ b/modules/core/src/flex/messaging/client/AsyncPollHandler.java @@ -1,31 +1,31 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -/** - * Defines the interface to handle asynchronous poll results. - */ -public interface AsyncPollHandler -{ - /** - * Invoked by the <tt>FlexClient</tt> when an asynchronous poll result is available. - * - * @param flushResult The flush result containing messages to return in the poll response and - * an optional wait time before the client should issue its next poll. - */ - void asyncPollComplete(FlushResult flushResult); -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +/** + * Defines the interface to handle asynchronous poll results. + */ +public interface AsyncPollHandler +{ + /** + * Invoked by the <tt>FlexClient</tt> when an asynchronous poll result is available. + * + * @param flushResult The flush result containing messages to return in the poll response and + * an optional wait time before the client should issue its next poll. + */ + void asyncPollComplete(FlushResult flushResult); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/client/EndpointPushHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/flex/messaging/client/EndpointPushHandler.java b/modules/core/src/flex/messaging/client/EndpointPushHandler.java old mode 100755 new mode 100644 index a043cb7..8e9c19e --- a/modules/core/src/flex/messaging/client/EndpointPushHandler.java +++ b/modules/core/src/flex/messaging/client/EndpointPushHandler.java @@ -1,79 +1,79 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -import java.util.List; - -import flex.messaging.MessageClient; -import flex.messaging.messages.Message; - -/** - * Defines the interface for a handler that may be registered by an endpoint with a <tt>FlexClient</tt> in order - * to push messages to a connected client. - */ -public interface EndpointPushHandler -{ - /** - * Invoked to shut down the handler. - * It may be invoked by the endpoint when the underlying connection it manages to the client closes, - * or by the <tt>FlexClient</tt> if it is invalidated. - * The implementation of this method should release any resources, and should not attempt to notify the - * client of an explicit disconnect. - * - * @see #close(boolean) - */ - void close(); - - /** - * Invoked to shut down the handler. - * It may be invoked by the endpoint when the underlying connection it manages to the client closes, - * or by the <tt>FlexClient</tt> if it is invalidated. - * The implementation of this method should release any resources, and may attempt to notify the client - * Channel that it has been disconnected in order to suppress automatic reconnect behavior. - * - * @param disconnectChannel True to attempt to notify the client of an explicit disconnect in order to - * suppress automatic reconnect behavior. - */ - void close(boolean disconnectChannel); - - /** - * Invoked by the <tt>FlexClient</tt> when it has messages to push to - * the client. - * - * @param messagesToPush The list of messages to push. - */ - void pushMessages(List<Message> messagesToPush); - - /** - * Invoked to notify the handler that the <tt>MessageClient</tt> subscription is using this handler. - * If subscriptions should be invalidated if the handler is closed, it should retain references to - * all registered <tt>MessageClient</tt> instances and invalidate them when it closes. - * - * @param messageClient The <tt>MessageClient</tt> subscription using this handler. - */ - void registerMessageClient(MessageClient messageClient); - - /** - * Invoked to notify the handler that a <tt>MessageClient</tt> subscription that was using it has - * been invalidated. - * If the handler is tracking the set of <tt>MessageClient</tt> instances that are using it, the handler should - * remove the instance from its set. - * - * @param messageClient The <tt>MessageClient</tt> subscription no longer using this handler. - */ - void unregisterMessageClient(MessageClient messageClient); -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import java.util.List; + +import flex.messaging.MessageClient; +import flex.messaging.messages.Message; + +/** + * Defines the interface for a handler that may be registered by an endpoint with a <tt>FlexClient</tt> in order + * to push messages to a connected client. + */ +public interface EndpointPushHandler +{ + /** + * Invoked to shut down the handler. + * It may be invoked by the endpoint when the underlying connection it manages to the client closes, + * or by the <tt>FlexClient</tt> if it is invalidated. + * The implementation of this method should release any resources, and should not attempt to notify the + * client of an explicit disconnect. + * + * @see #close(boolean) + */ + void close(); + + /** + * Invoked to shut down the handler. + * It may be invoked by the endpoint when the underlying connection it manages to the client closes, + * or by the <tt>FlexClient</tt> if it is invalidated. + * The implementation of this method should release any resources, and may attempt to notify the client + * Channel that it has been disconnected in order to suppress automatic reconnect behavior. + * + * @param disconnectChannel True to attempt to notify the client of an explicit disconnect in order to + * suppress automatic reconnect behavior. + */ + void close(boolean disconnectChannel); + + /** + * Invoked by the <tt>FlexClient</tt> when it has messages to push to + * the client. + * + * @param messagesToPush The list of messages to push. + */ + void pushMessages(List<Message> messagesToPush); + + /** + * Invoked to notify the handler that the <tt>MessageClient</tt> subscription is using this handler. + * If subscriptions should be invalidated if the handler is closed, it should retain references to + * all registered <tt>MessageClient</tt> instances and invalidate them when it closes. + * + * @param messageClient The <tt>MessageClient</tt> subscription using this handler. + */ + void registerMessageClient(MessageClient messageClient); + + /** + * Invoked to notify the handler that a <tt>MessageClient</tt> subscription that was using it has + * been invalidated. + * If the handler is tracking the set of <tt>MessageClient</tt> instances that are using it, the handler should + * remove the instance from its set. + * + * @param messageClient The <tt>MessageClient</tt> subscription no longer using this handler. + */ + void unregisterMessageClient(MessageClient messageClient); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/client/EndpointPushNotifier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/flex/messaging/client/EndpointPushNotifier.java b/modules/core/src/flex/messaging/client/EndpointPushNotifier.java old mode 100755 new mode 100644 index 925f7e1..151699e --- a/modules/core/src/flex/messaging/client/EndpointPushNotifier.java +++ b/modules/core/src/flex/messaging/client/EndpointPushNotifier.java @@ -1,461 +1,461 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import flex.messaging.FlexContext; -import flex.messaging.FlexSession; -import flex.messaging.FlexSessionListener; -import flex.messaging.MessageClient; -import flex.messaging.MessageClientListener; -import flex.messaging.endpoints.BaseStreamingHTTPEndpoint; -import flex.messaging.endpoints.Endpoint; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.messages.AsyncMessage; -import flex.messaging.messages.CommandMessage; -import flex.messaging.util.TimeoutAbstractObject; -import flex.messaging.util.UUIDUtils; - -/** - * @exclude - * Instances of this class are used by endpoints that support streaming - * outbound data to connected clients when the client is not polling and - * the FlexSession representing the connection does not support push directly. - * This generally requires that the client and endpoint establish a separate, - * physical connection for pushed data that is part of a larger, logical - * connection/session. - * <p> - * When the endpoint establishes this physical streaming connection it will - * create an instance of this class, register it with the FlexClient and then - * wait on the public <code>pushNeeded</code> condition variable. - * When data arrives to push to the remote client, the FlexClient will queue it - * with this notifier instance and the waiting endpoint will be notified. - * The endpoint will retrieve the queued messages from the notifier instance and will - * stream them to the client and then go back into a wait state on the - * <code>pushNeeded</code> condition variable. - * </p><p> - * Note that this implementation is based upon <code>Object.wait()</code>; it is not a - * non-blocking implementation. - * </p> - */ -public class EndpointPushNotifier extends TimeoutAbstractObject implements EndpointPushHandler, FlexSessionListener, MessageClientListener -{ - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * Constructs a PushNotifier instance. - * - * @param endpoint The endpoint that will use this notifier. - * @param flexClient The FlexClient that will use this notifier. - */ - public EndpointPushNotifier(Endpoint endpoint, FlexClient flexClient) - { - notifierId = UUIDUtils.createUUID(false /* doesn't need to be secure */); - this.endpoint = endpoint; - this.flexClient = flexClient; - flexClient.registerEndpointPushHandler(this, endpoint.getId()); - flexSession = FlexContext.getFlexSession(); - if (flexSession != null) - flexSession.addSessionDestroyedListener(this); - invalidateMessageClientOnStreamingClose = (endpoint instanceof BaseStreamingHTTPEndpoint)? - ((BaseStreamingHTTPEndpoint)endpoint).isInvalidateMessageClientOnStreamingClose() : false; - updateLastUse(); // Initialize last use timestamp to construct time. - } - - //-------------------------------------------------------------------------- - // - // Public Variables - // - //-------------------------------------------------------------------------- - - /** - * The condition variable that the endpoint waits on for pushed data to arrive. - */ - public final Object pushNeeded = new Object(); - - //-------------------------------------------------------------------------- - // - // Private Variables - // - //-------------------------------------------------------------------------- - - /** - * Flag indicating whether the notifier has been closed/shut down. - * This is used to signal a waiting endpoint that it should break out of its - * wait loop and close its streaming connection. - */ - private volatile boolean closed; - - /** - * Flag indicating that the notifier has started closing; used to allow only - * one thread to execute the close() logic and delay flipping closed to true - * to allow any final messages to be streamed to the client before the endpoint - * using the notifier breaks out of its wait/notify loop and terminates the - * streaming connection. - */ - private volatile boolean closing; - - /** - * The number of minutes a client can remain idle before the server - * times the notifier out. - */ - private int idleTimeoutMinutes; - - /** - * Whether to invalidate the message-client when the streaming connection is closed. - */ - private final boolean invalidateMessageClientOnStreamingClose; - - /** - * The endpoint that uses this notifier. - */ - private final Endpoint endpoint; - - /** - * The FlexClient this notifier is associated with. - */ - private final FlexClient flexClient; - - /** - * The FlexSession this notifier is associated with. - */ - private final FlexSession flexSession; - - /** - * Lock for instance-level synchronization. - */ - private final Object lock = new Object(); - - /** - * Log category used by the notifier. Initialized to ENDPOINT_GENERAL but - * endpoints using this notifier should set it to their own log categories. - */ - private String logCategory = LogCategories.ENDPOINT_GENERAL; - - /** - * Queue of messages that the FlexClient will populate and the endpoint will drain to - * stream to the client. - */ - private List<AsyncMessage> messages; - - /** - * List of MessageClient subscriptions using this endpoint push notifier. - * When this notifier is closed, any associated subscriptions need to be invalidated. - */ - private final CopyOnWriteArrayList<MessageClient> messageClients = new CopyOnWriteArrayList<MessageClient>(); - - /** - * Unique identifier for this instance. - */ - private final String notifierId; - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Moves this notifier to a closed state, notifying any listeners, - * associated subscriptions and waiting endpoints. - * Does not attempt to notify the client Channel of the disconnect thereby allowing - * automatic reconnect processing to run. - */ - public void close() - { - close(false); - } - - /** - * Moves this notifier to a closed state, notifying any listeners, - * associated subscriptions and waiting endpoints. - * Attempts to notify the client Channel of an explicit disconnect, thereby suppressing - * automatic reconnect processing. - * - * @param disconnectChannel True to attempt to notify the client Channel of the disconnect - * and suppress automatic reconnect processing. - */ - public void close(boolean disconnectChannel) - { - synchronized (lock) - { - if (closed || closing) - return; - - closing = true; - } - - cancelTimeout(); - - if (flexSession != null) - flexSession.removeSessionDestroyedListener(this); - - // Shut down flow of further messages to this notifier. - flexClient.unregisterEndpointPushHandler(this, endpoint.getId()); - - // Push a disconnect command down to the client to suppress automatic reconnect. - if (disconnectChannel) - { - ArrayList<AsyncMessage> list = new ArrayList<AsyncMessage>(1); - CommandMessage disconnect = new CommandMessage(CommandMessage.DISCONNECT_OPERATION); - list.add(disconnect); - pushMessages(list); - } - - // Invalidate associated subscriptions; this doesn't attempt to notify the client. - // Any client subscriptions made over this endpoint will be automatically invalidated - // on the client when it receives its channel disconnect event. - if (invalidateMessageClientOnStreamingClose) - { - for (Iterator<MessageClient> iter = messageClients.iterator() ; iter.hasNext();) - iter.next().invalidate(); - } - - // Move to final closed state; after this we need to notify one last time to stream - // any final messages to the client and allow the endpoint to shut down its streaming - // connection. - synchronized (lock) - { - closed = true; - closing = false; - } - synchronized (pushNeeded) - { - pushNeeded.notifyAll(); - } - } - - /** - * Returns any messages available to push to the client, and removes them - * from this notifier. - * Notified endpoints should invoke this method to retrieve messages, stream them - * to the client and then re-enter the wait state. - * This method acquires a lock on <code>pushNeeded</code>. - * - * @return The messages to push to the client. - */ - public List<AsyncMessage> drainMessages() - { - synchronized (pushNeeded) - { - List<AsyncMessage> messagesToPush = messages; - messages = null; - return messagesToPush; - } - } - - /** - * Returns whether the notifier has closed; used to break the endpoint's wait cycle. - * - * @return True if the notifier has closed; otherwise false. - */ - public boolean isClosed() - { - return closed; - } - - /** - * Returns the endpoint that is using this notifier. - * - * @return The endpoint using this notifier. - */ - public Endpoint getEndpoint() - { - return endpoint; - } - - /** - * Returns the idle timeout minutes used by the notifier. - * - * @return The idle timeout minutes used by the notifier. - */ - public int getIdleTimeoutMinutes() - { - return idleTimeoutMinutes; - } - - /** - * Sets the idle timeout minutes used by the notifier. - * - * @param idleTimeoutMinutes The idle timeout minutes used by the notifier. - */ - public void setIdleTimeoutMinutes(int idleTimeoutMinutes) - { - this.idleTimeoutMinutes = idleTimeoutMinutes; - } - - /** - * Returns the log category used by this notifier. - * - * @return The log category used by this notifier. - */ - public String getLogCategory() - { - return logCategory; - } - - /** - * Sets the log category used by this notifier. Endpoints using this notifier - * should set it to their own log categories. - * - * @param logCategory The log category for the notifier to use. - */ - public void setLogCategory(String logCategory) - { - this.logCategory = logCategory; - } - - /** - * Returns the unique id for this notifier. - * - * @return The unique id for this notifier. - */ - public String getNotifierId() - { - return notifierId; - } - - /** - * @exclude - * Implements TimeoutCapable. - * Determine the time, in milliseconds, that this object is allowed to idle - * before having its timeout method invoked. - */ - public long getTimeoutPeriod() - { - return (idleTimeoutMinutes * 60 * 1000); - } - - /** - * @exclude - */ - public void messageClientCreated(MessageClient messageClient) - { - // No-op. - } - - /** - * @exclude - */ - public void messageClientDestroyed(MessageClient messageClient) - { - unregisterMessageClient(messageClient); - } - - /** - * Used by FlexClient to push messages to the endpoint. - * This method will automatically notify a waiting endpoint, if one exists - * and it acquires a lock on <code>pushNeeded</code>. - * - * @param messages The messages to push to the client. - */ - public void pushMessages(List messagesToPush) - { - if (!messagesToPush.isEmpty()) - { - synchronized (pushNeeded) - { - // Push these straight on through; notify immediately. - if (messages == null) - messages = messagesToPush; - else - messages.addAll(messagesToPush); - - // If the notifier isn't closing, notify; otherwise just add and the close will - // notify once it completes. - if (!closing) - pushNeeded.notifyAll(); - } - } - } - - /** - * Registers a MessageClient subscription that depends on this notifier. - * - * @param messageClient A MessageClient that depends on this notifier. - */ - public void registerMessageClient(MessageClient messageClient) - { - if (messageClient != null) - { - if (messageClients.addIfAbsent(messageClient)) - messageClient.addMessageClientDestroyedListener(this); - } - } - - /** - * Handle session creation events. This handler is a no-op because the notifier - * is only concerned with its associated session's destruction. - * - * @param flexSession The newly created FlexSession. - */ - public void sessionCreated(FlexSession flexSession) - { - // No-op. - } - - /** - * Handle session destruction events. This will be invoked when the notifier's - * associated session is invalidated, and this will trigger the notifier to close. - * - * @param flexSession The FlexSession being invalidated. - */ - public void sessionDestroyed(FlexSession flexSession) - { - if (Log.isInfo()) - Log.getLogger(logCategory).info("Endpoint with id '" + endpoint.getId() + "' is closing" - + " a streaming connection for the FlexClient with id '" + flexClient.getId() + "'" - + " since its associated session has been destroyed."); - close(true /* disconnect client Channel */); - } - - /** - * @exclude - * Implements TimeoutCapable. - * Inform the object that it has timed out. - */ - public void timeout() - { - if (Log.isInfo()) - Log.getLogger(logCategory).info("Endpoint with id '" + endpoint.getId() + "' is timing out" - + " a streaming connection for the FlexClient with id '" + flexClient.getId() + "'"); - close(true /* disconnect client Channel */); - } - - /** - * Unregisters a MessageClient subscription that depended on this notifier. - * - * @param messageClient A MessageClient that depended on this notifier. - */ - public void unregisterMessageClient(MessageClient messageClient) - { - if (messageClient != null) - { - messageClient.removeMessageClientDestroyedListener(this); - messageClients.remove(messageClient); - } - } +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import flex.messaging.FlexContext; +import flex.messaging.FlexSession; +import flex.messaging.FlexSessionListener; +import flex.messaging.MessageClient; +import flex.messaging.MessageClientListener; +import flex.messaging.endpoints.BaseStreamingHTTPEndpoint; +import flex.messaging.endpoints.Endpoint; +import flex.messaging.log.Log; +import flex.messaging.log.LogCategories; +import flex.messaging.messages.AsyncMessage; +import flex.messaging.messages.CommandMessage; +import flex.messaging.util.TimeoutAbstractObject; +import flex.messaging.util.UUIDUtils; + +/** + * @exclude + * Instances of this class are used by endpoints that support streaming + * outbound data to connected clients when the client is not polling and + * the FlexSession representing the connection does not support push directly. + * This generally requires that the client and endpoint establish a separate, + * physical connection for pushed data that is part of a larger, logical + * connection/session. + * <p> + * When the endpoint establishes this physical streaming connection it will + * create an instance of this class, register it with the FlexClient and then + * wait on the public <code>pushNeeded</code> condition variable. + * When data arrives to push to the remote client, the FlexClient will queue it + * with this notifier instance and the waiting endpoint will be notified. + * The endpoint will retrieve the queued messages from the notifier instance and will + * stream them to the client and then go back into a wait state on the + * <code>pushNeeded</code> condition variable. + * </p><p> + * Note that this implementation is based upon <code>Object.wait()</code>; it is not a + * non-blocking implementation. + * </p> + */ +public class EndpointPushNotifier extends TimeoutAbstractObject implements EndpointPushHandler, FlexSessionListener, MessageClientListener +{ + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs a PushNotifier instance. + * + * @param endpoint The endpoint that will use this notifier. + * @param flexClient The FlexClient that will use this notifier. + */ + public EndpointPushNotifier(Endpoint endpoint, FlexClient flexClient) + { + notifierId = UUIDUtils.createUUID(false /* doesn't need to be secure */); + this.endpoint = endpoint; + this.flexClient = flexClient; + flexClient.registerEndpointPushHandler(this, endpoint.getId()); + flexSession = FlexContext.getFlexSession(); + if (flexSession != null) + flexSession.addSessionDestroyedListener(this); + invalidateMessageClientOnStreamingClose = (endpoint instanceof BaseStreamingHTTPEndpoint)? + ((BaseStreamingHTTPEndpoint)endpoint).isInvalidateMessageClientOnStreamingClose() : false; + updateLastUse(); // Initialize last use timestamp to construct time. + } + + //-------------------------------------------------------------------------- + // + // Public Variables + // + //-------------------------------------------------------------------------- + + /** + * The condition variable that the endpoint waits on for pushed data to arrive. + */ + public final Object pushNeeded = new Object(); + + //-------------------------------------------------------------------------- + // + // Private Variables + // + //-------------------------------------------------------------------------- + + /** + * Flag indicating whether the notifier has been closed/shut down. + * This is used to signal a waiting endpoint that it should break out of its + * wait loop and close its streaming connection. + */ + private volatile boolean closed; + + /** + * Flag indicating that the notifier has started closing; used to allow only + * one thread to execute the close() logic and delay flipping closed to true + * to allow any final messages to be streamed to the client before the endpoint + * using the notifier breaks out of its wait/notify loop and terminates the + * streaming connection. + */ + private volatile boolean closing; + + /** + * The number of minutes a client can remain idle before the server + * times the notifier out. + */ + private int idleTimeoutMinutes; + + /** + * Whether to invalidate the message-client when the streaming connection is closed. + */ + private final boolean invalidateMessageClientOnStreamingClose; + + /** + * The endpoint that uses this notifier. + */ + private final Endpoint endpoint; + + /** + * The FlexClient this notifier is associated with. + */ + private final FlexClient flexClient; + + /** + * The FlexSession this notifier is associated with. + */ + private final FlexSession flexSession; + + /** + * Lock for instance-level synchronization. + */ + private final Object lock = new Object(); + + /** + * Log category used by the notifier. Initialized to ENDPOINT_GENERAL but + * endpoints using this notifier should set it to their own log categories. + */ + private String logCategory = LogCategories.ENDPOINT_GENERAL; + + /** + * Queue of messages that the FlexClient will populate and the endpoint will drain to + * stream to the client. + */ + private List<AsyncMessage> messages; + + /** + * List of MessageClient subscriptions using this endpoint push notifier. + * When this notifier is closed, any associated subscriptions need to be invalidated. + */ + private final CopyOnWriteArrayList<MessageClient> messageClients = new CopyOnWriteArrayList<MessageClient>(); + + /** + * Unique identifier for this instance. + */ + private final String notifierId; + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Moves this notifier to a closed state, notifying any listeners, + * associated subscriptions and waiting endpoints. + * Does not attempt to notify the client Channel of the disconnect thereby allowing + * automatic reconnect processing to run. + */ + public void close() + { + close(false); + } + + /** + * Moves this notifier to a closed state, notifying any listeners, + * associated subscriptions and waiting endpoints. + * Attempts to notify the client Channel of an explicit disconnect, thereby suppressing + * automatic reconnect processing. + * + * @param disconnectChannel True to attempt to notify the client Channel of the disconnect + * and suppress automatic reconnect processing. + */ + public void close(boolean disconnectChannel) + { + synchronized (lock) + { + if (closed || closing) + return; + + closing = true; + } + + cancelTimeout(); + + if (flexSession != null) + flexSession.removeSessionDestroyedListener(this); + + // Shut down flow of further messages to this notifier. + flexClient.unregisterEndpointPushHandler(this, endpoint.getId()); + + // Push a disconnect command down to the client to suppress automatic reconnect. + if (disconnectChannel) + { + ArrayList<AsyncMessage> list = new ArrayList<AsyncMessage>(1); + CommandMessage disconnect = new CommandMessage(CommandMessage.DISCONNECT_OPERATION); + list.add(disconnect); + pushMessages(list); + } + + // Invalidate associated subscriptions; this doesn't attempt to notify the client. + // Any client subscriptions made over this endpoint will be automatically invalidated + // on the client when it receives its channel disconnect event. + if (invalidateMessageClientOnStreamingClose) + { + for (Iterator<MessageClient> iter = messageClients.iterator() ; iter.hasNext();) + iter.next().invalidate(); + } + + // Move to final closed state; after this we need to notify one last time to stream + // any final messages to the client and allow the endpoint to shut down its streaming + // connection. + synchronized (lock) + { + closed = true; + closing = false; + } + synchronized (pushNeeded) + { + pushNeeded.notifyAll(); + } + } + + /** + * Returns any messages available to push to the client, and removes them + * from this notifier. + * Notified endpoints should invoke this method to retrieve messages, stream them + * to the client and then re-enter the wait state. + * This method acquires a lock on <code>pushNeeded</code>. + * + * @return The messages to push to the client. + */ + public List<AsyncMessage> drainMessages() + { + synchronized (pushNeeded) + { + List<AsyncMessage> messagesToPush = messages; + messages = null; + return messagesToPush; + } + } + + /** + * Returns whether the notifier has closed; used to break the endpoint's wait cycle. + * + * @return True if the notifier has closed; otherwise false. + */ + public boolean isClosed() + { + return closed; + } + + /** + * Returns the endpoint that is using this notifier. + * + * @return The endpoint using this notifier. + */ + public Endpoint getEndpoint() + { + return endpoint; + } + + /** + * Returns the idle timeout minutes used by the notifier. + * + * @return The idle timeout minutes used by the notifier. + */ + public int getIdleTimeoutMinutes() + { + return idleTimeoutMinutes; + } + + /** + * Sets the idle timeout minutes used by the notifier. + * + * @param idleTimeoutMinutes The idle timeout minutes used by the notifier. + */ + public void setIdleTimeoutMinutes(int idleTimeoutMinutes) + { + this.idleTimeoutMinutes = idleTimeoutMinutes; + } + + /** + * Returns the log category used by this notifier. + * + * @return The log category used by this notifier. + */ + public String getLogCategory() + { + return logCategory; + } + + /** + * Sets the log category used by this notifier. Endpoints using this notifier + * should set it to their own log categories. + * + * @param logCategory The log category for the notifier to use. + */ + public void setLogCategory(String logCategory) + { + this.logCategory = logCategory; + } + + /** + * Returns the unique id for this notifier. + * + * @return The unique id for this notifier. + */ + public String getNotifierId() + { + return notifierId; + } + + /** + * @exclude + * Implements TimeoutCapable. + * Determine the time, in milliseconds, that this object is allowed to idle + * before having its timeout method invoked. + */ + public long getTimeoutPeriod() + { + return (idleTimeoutMinutes * 60 * 1000); + } + + /** + * @exclude + */ + public void messageClientCreated(MessageClient messageClient) + { + // No-op. + } + + /** + * @exclude + */ + public void messageClientDestroyed(MessageClient messageClient) + { + unregisterMessageClient(messageClient); + } + + /** + * Used by FlexClient to push messages to the endpoint. + * This method will automatically notify a waiting endpoint, if one exists + * and it acquires a lock on <code>pushNeeded</code>. + * + * @param messages The messages to push to the client. + */ + public void pushMessages(List messagesToPush) + { + if (!messagesToPush.isEmpty()) + { + synchronized (pushNeeded) + { + // Push these straight on through; notify immediately. + if (messages == null) + messages = messagesToPush; + else + messages.addAll(messagesToPush); + + // If the notifier isn't closing, notify; otherwise just add and the close will + // notify once it completes. + if (!closing) + pushNeeded.notifyAll(); + } + } + } + + /** + * Registers a MessageClient subscription that depends on this notifier. + * + * @param messageClient A MessageClient that depends on this notifier. + */ + public void registerMessageClient(MessageClient messageClient) + { + if (messageClient != null) + { + if (messageClients.addIfAbsent(messageClient)) + messageClient.addMessageClientDestroyedListener(this); + } + } + + /** + * Handle session creation events. This handler is a no-op because the notifier + * is only concerned with its associated session's destruction. + * + * @param flexSession The newly created FlexSession. + */ + public void sessionCreated(FlexSession flexSession) + { + // No-op. + } + + /** + * Handle session destruction events. This will be invoked when the notifier's + * associated session is invalidated, and this will trigger the notifier to close. + * + * @param flexSession The FlexSession being invalidated. + */ + public void sessionDestroyed(FlexSession flexSession) + { + if (Log.isInfo()) + Log.getLogger(logCategory).info("Endpoint with id '" + endpoint.getId() + "' is closing" + + " a streaming connection for the FlexClient with id '" + flexClient.getId() + "'" + + " since its associated session has been destroyed."); + close(true /* disconnect client Channel */); + } + + /** + * @exclude + * Implements TimeoutCapable. + * Inform the object that it has timed out. + */ + public void timeout() + { + if (Log.isInfo()) + Log.getLogger(logCategory).info("Endpoint with id '" + endpoint.getId() + "' is timing out" + + " a streaming connection for the FlexClient with id '" + flexClient.getId() + "'"); + close(true /* disconnect client Channel */); + } + + /** + * Unregisters a MessageClient subscription that depended on this notifier. + * + * @param messageClient A MessageClient that depended on this notifier. + */ + public void unregisterMessageClient(MessageClient messageClient) + { + if (messageClient != null) + { + messageClient.removeMessageClientDestroyedListener(this); + messageClients.remove(messageClient); + } + } } \ No newline at end of file
