http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageException.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageException.java b/core/src/flex/messaging/MessageException.java new file mode 100644 index 0000000..f3b565a --- /dev/null +++ b/core/src/flex/messaging/MessageException.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import java.util.Map; + +import flex.messaging.log.Log; +import flex.messaging.log.LogCategories; +import flex.messaging.log.LogEvent; +import flex.messaging.messages.ErrorMessage; +import flex.messaging.messages.Message; +import flex.messaging.util.ExceptionUtil; +import flex.messaging.util.PropertyStringResourceLoader; +import flex.messaging.util.ResourceLoader; +import flex.messaging.util.StringUtils; + +/** + * The MessageException class is the basic exception type used throughout + * the server. This class is extended to support more specific exception types. + */ +public class MessageException extends LocalizedException +{ + //-------------------------------------------------------------------------- + // + // Static Constants + // + //-------------------------------------------------------------------------- + + // Message exception code strings. + public static final String CODE_SERVER_RESOURCE_UNAVAILABLE = "Server.ResourceUnavailable"; + + + static final long serialVersionUID = 3310842114461162689L; + + //-------------------------------------------------------------------------- + // + // Constructors + // + //-------------------------------------------------------------------------- + + /** + * Default constructor. + */ + public MessageException() + { + } + + /** + * Construct a message specifying a ResourceLoader to be used to load localized strings. + * + * @param loader + */ + public MessageException(ResourceLoader loader) + { + super(loader); + } + + /** + * Constructor with a message. + * + * @param message The detailed message for the exception. + */ + public MessageException(String message) + { + setMessage(message); + } + + /** + * Constructs a new exception with the specified message and the <code>Throwable</code> as the root cause. + * + * @param message The detailed message for the exception. + * @param t The root cause of the exception. + */ + public MessageException(String message, Throwable t) + { + setMessage(message); + setRootCause(t); + } + + /** + * Constructs a new exception with the specified <code>Throwable</code> as the root cause. + * + * @param t The root cause of the exception. + */ + public MessageException(Throwable t) + { + String rootMessage = t.getMessage(); + if (rootMessage == null) + rootMessage = t.toString(); + setMessage(rootMessage); + setRootCause(t); + } + + //-------------------------------------------------------------------------- + // + // Properties + // + //-------------------------------------------------------------------------- + + //---------------------------------- + // code + //---------------------------------- + + + protected String code; + + /** + * Returns the code of the exception. + * + * @return Code of the exception. + */ + public String getCode() + { + return code; + } + + /** + * Sets the code of the exception. + * + * @param code Code of the exception. + */ + public void setCode(String code) + { + this.code = code; + } + + //---------------------------------- + // defaultLogMessageIntro + //---------------------------------- + + /** + * Returns the default initial text for use in the log output generated by <code>logAtHingePoint()</code>. + * + * @return the default initial text for use in the log output generated by <code>logAtHingePoint()</code>. + */ + public String getDefaultLogMessageIntro() + { + return "Error handling message: "; + } + + //---------------------------------- + // extendedData + //---------------------------------- + + + protected Map extendedData; + + /** + * Returns the extended data of the exception. + * + * @return The extended data of the exception. + */ + public Map getExtendedData() + { + return extendedData; + } + + /** + * Sets the extended data of the exception. + * + * @param extendedData The extended data of the exception. + */ + public void setExtendedData(Map extendedData) + { + this.extendedData = extendedData; + } + + //---------------------------------- + // errorMessage + //---------------------------------- + + + protected ErrorMessage errorMessage; + + /** + * Returns the error message of the exception. + * + * @return The error message of the exception. + */ + public ErrorMessage getErrorMessage() + { + if (errorMessage == null) + { + errorMessage = createErrorMessage(); + } + return errorMessage; + } + + /** + * Sets the error message of the exception. + * + * @param errorMessage The error message of the exception. + */ + public void setErrorMessage(ErrorMessage errorMessage) + { + this.errorMessage = errorMessage; + } + + //---------------------------------- + // logStackTraceEnabled + //---------------------------------- + + /** + * Indicates whether logging of this exception should include a full stack trace or not. + * Default is true. + * + * @see #logAtHingePoint(Message, ErrorMessage, String) + * @return true if the logging stack traces is enabled. + */ + public boolean isLogStackTraceEnabled() + { + return true; + } + + //---------------------------------- + // logged + //---------------------------------- + + protected boolean logged; + + /** + * Indicates whether this exception has already been logged + * by a call to <code>logAtHingPoint()</code>. + * Manual logging for an exception can use <code>setLogged(true)</code> + * to suppress any further automatic logging of the exception. + * + * @return true if the exception has been logged; otherwise false. + */ + public boolean isLogged() + { + return logged; + } + + /** + * Records whether this exception has been logged. + * + * @param value true if the exception has been logged; otherwise false. + */ + public void setLogged(boolean value) + { + logged = value; + } + + //---------------------------------- + // peferredLogLevel + //---------------------------------- + + /** + * Returns the preferred log level for this exception instance. + * The default value is <code>LogEvent.ERROR</code>. + * + * @see #logAtHingePoint(Message, ErrorMessage, String) + * @return the preffered log level. + */ + public short getPreferredLogLevel() + { + return LogEvent.ERROR; + } + + //---------------------------------- + // resourceLoader + //---------------------------------- + + /** + * Returns the <code>ResourceLoader</code> used to load localized strings. + * + * @return The <code>ResourceLoader</code> used to load localized strings. + */ + @Override protected ResourceLoader getResourceLoader() + { + if (resourceLoader == null) + { + try + { + MessageBroker broker = FlexContext.getMessageBroker(); + resourceLoader = broker != null? broker.getSystemSettings().getResourceLoader() + : new PropertyStringResourceLoader(); + } + catch (NoClassDefFoundError exception) // Could happen in client mode. + { + return new PropertyStringResourceLoader(); + } + } + + return resourceLoader; + } + + //---------------------------------- + // rootCauseErrorMessage + //---------------------------------- + + + public Object getRootCauseErrorMessage() + { + if (rootCause == null) + return null; + + // FIXME: serialize number field. + return rootCause instanceof MessageException? + ((MessageException)rootCause).createErrorMessage() : rootCause; + } + + //---------------------------------- + // statusCode + //---------------------------------- + + protected int statusCode; + + /** + * Returns the HTTP status code of the exception. + * + * @return The HTTP status code of the exception. + */ + public int getStatusCode() + { + return statusCode; + } + + /** + * Sets the HTTP status code of the exception. + * + * @param statusCode The HTTP status code of the exception. + */ + public void setStatusCode(int statusCode) + { + this.statusCode = statusCode; + } + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Creates an error message from the exception. + * + * @return The error message. + */ + public ErrorMessage createErrorMessage() + { + ErrorMessage msg = new ErrorMessage(); + msg.faultCode = code != null? code : "Server.Processing"; + msg.faultString = message; + msg.faultDetail = details; + msg.rootCause = getRootCauseErrorMessage(); + if (extendedData != null) + msg.extendedData = extendedData; + if (statusCode != 0) + msg.setHeader(Message.STATUS_CODE_HEADER, statusCode); + return msg; + } + + /** + * Invoked at hinge-points in server processing where catch-all exception logging is performed. + * This method uses <code>isLogged()</code> and <code>setLogged()</code> to avoid repeat logging + * of the same exception and uses <code>getPreferredLogLevel()</code> which may be + * overridden in subclasses to control the log level that the logging is output at. + * The underlying exception stack traces are also conditionally included in log output + * if the exception class allows it and this is determined by invoking <code>isLogStackTraceEnabled()</code> + * + * @param inboundMessage The inbound message that triggered an exception during processing. + * @param outboundMessage The outbound <code>ErrorMessage</code>, which may be null depending on whether it has been generated + * or not at the point this method is invoked. + * @param logMessageIntro The beginning text for the log message, which may be null; default value is returned by <code>getDefaultLogMessageIntro()</code>. + */ + public void logAtHingePoint(Message inboundMessage, ErrorMessage outboundMessage, String logMessageIntro) + { + if (!isLogged()) + { + setLogged(true); + + short preferredLevel = getPreferredLogLevel(); + // If the preferred level is less than the current Log level; return early. + if (preferredLevel < Log.getTargetLevel()) + return; + + // Construct core log output. + StringBuffer output = new StringBuffer(); + output.append((logMessageIntro != null) ? logMessageIntro : getDefaultLogMessageIntro()); + output.append(this.toString()); + output.append(StringUtils.NEWLINE); + output.append(" incomingMessage: "); + output.append(inboundMessage); + output.append(StringUtils.NEWLINE); + if (outboundMessage != null) + { + output.append(" errorReply: "); + output.append(outboundMessage); + output.append(StringUtils.NEWLINE); + } + if (isLogStackTraceEnabled()) + { + output.append(ExceptionUtil.exceptionFollowedByRootCausesToString(this)); + output.append(StringUtils.NEWLINE); + } + + switch (preferredLevel) + { + case LogEvent.FATAL: + { + Log.getLogger(LogCategories.MESSAGE_GENERAL).fatal(output.toString()); + break; + } + case LogEvent.ERROR: + { + Log.getLogger(LogCategories.MESSAGE_GENERAL).error(output.toString()); + break; + } + case LogEvent.WARN: + { + Log.getLogger(LogCategories.MESSAGE_GENERAL).warn(output.toString()); + break; + } + case LogEvent.INFO: + { + Log.getLogger(LogCategories.MESSAGE_GENERAL).info(output.toString()); + break; + } + case LogEvent.DEBUG: + { + Log.getLogger(LogCategories.MESSAGE_GENERAL).debug(output.toString()); + break; + } + default: + { + Log.getLogger(LogCategories.MESSAGE_GENERAL).fatal("Failed to log exception for handling message due to an invalid preferred log level: " + preferredLevel); + break; + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageRoutedEvent.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageRoutedEvent.java b/core/src/flex/messaging/MessageRoutedEvent.java new file mode 100644 index 0000000..397b261 --- /dev/null +++ b/core/src/flex/messaging/MessageRoutedEvent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import flex.messaging.messages.Message; + +import java.util.EventObject; + +/** + * + * This event indicates that the source message has been routed to the outbound message queues + * for all target clients. + * This can be used as the trigger for performing optimized IO to flush these queued messages to + * remote hosts over the network. + */ +public class MessageRoutedEvent extends EventObject +{ + /** + * + */ + private static final long serialVersionUID = -3063794416424805005L; + + /** + * Constructs a new <tt>MessageRoutedEvent</tt> using the supplied source <tt>Message</tt>. + * + * @param message The message that has been routed. + */ + public MessageRoutedEvent(Message message) + { + super(message); + } + + /** + * Returns the message that has been routed. + */ + public Message getMessage() + { + return (Message)getSource(); + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageRoutedListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageRoutedListener.java b/core/src/flex/messaging/MessageRoutedListener.java new file mode 100644 index 0000000..bf9a171 --- /dev/null +++ b/core/src/flex/messaging/MessageRoutedListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import java.util.EventListener; + +/** + * + * Provides notification for multicast message routing events to support optimized + * asynchronous IO to the target remote hosts. + */ +public interface MessageRoutedListener extends EventListener +{ + /** + * Invoked when a message has been routed to the outbound queues for all target + * clients. + * + * @param event The event containing the source message. + */ + void messageRouted(MessageRoutedEvent event); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/MessageRoutedNotifier.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/MessageRoutedNotifier.java b/core/src/flex/messaging/MessageRoutedNotifier.java new file mode 100644 index 0000000..f9aca87 --- /dev/null +++ b/core/src/flex/messaging/MessageRoutedNotifier.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import flex.messaging.messages.Message; + +import java.util.ArrayList; + +/** + * + * Supports registration and notification of <tt>MessageRoutedListener</tt>s. + * An instance of this class is exposed by <tt>FlexContext</tt> while a message is + * being routed, and once routing of the message to the outbound messages queues for + * target clients and registered listeners are notified. + * This class performs no synchronization because it is only used within the context + * of a single Thread, and only during the routing of a single message. + */ +public class MessageRoutedNotifier +{ + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs a <tt>MessageRoutedNotifier</tt> for the supplied source message. + * + * @param The source message being routed. + */ + public MessageRoutedNotifier(Message message) + { + this.message = message; + } + + //-------------------------------------------------------------------------- + // + // Variables + // + //-------------------------------------------------------------------------- + + /** + * The source message being routed. + */ + private final Message message; + + //-------------------------------------------------------------------------- + // + // Properties + // + //-------------------------------------------------------------------------- + + //---------------------------------- + // messageRoutedListeners + //---------------------------------- + + private ArrayList listeners; + + /** + * Adds a <tt>MessageRoutedListener</tt>. + */ + public void addMessageRoutedListener(MessageRoutedListener listener) + { + if (listener != null) + { + // Lazy-init only if necessary. + if (listeners == null) + listeners = new ArrayList(); + + // Add if absent. + if (!listeners.contains(listener)) + listeners.add(listener); + } + } + + /** + * Removes a <tt>MessageRoutedListener</tt>. + */ + public void removeMessageRoutedListener(MessageRoutedListener listener) + { + if ((listener != null) && (listeners != null)) + listeners.remove(listener); + } + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Notifies registered listeners of a routed message. + * + * @param message The message that has been routed. + */ + public void notifyMessageRouted() + { + if ((listeners != null) && !listeners.isEmpty()) + { + MessageRoutedEvent event = new MessageRoutedEvent(message); + int n = listeners.size(); + for (int i = 0; i < n; ++i) + ((MessageRoutedListener)listeners.get(i)).messageRouted(event); + } + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/Server.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/Server.java b/core/src/flex/messaging/Server.java new file mode 100644 index 0000000..dd6bf25 --- /dev/null +++ b/core/src/flex/messaging/Server.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +/** + * The interface for a shared server instance that may be associated with a + * <tt>MessageBroker</tt> and used by endpoints. + */ +public interface Server extends FlexComponent +{ + /** + * Returns the id for the server. + * Endpoints can lookup server instances that have been associated with a <tt>MessageBroker</tt> using {@link MessageBroker#getServer(String)}. + */ + String getId(); + + /** + * Sets the id for the server. + */ + void setId(String value); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/ServiceValidationListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/ServiceValidationListener.java b/core/src/flex/messaging/ServiceValidationListener.java new file mode 100644 index 0000000..d7f24f8 --- /dev/null +++ b/core/src/flex/messaging/ServiceValidationListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +/** + * + */ +public interface ServiceValidationListener +{ + /** + * This method gets called before any other processing of the describeServices method. + * It allows a hook for external systems that need to update the service destinations at runtime. + */ + void validateServices(); + void validateDestination(String destination); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/VersionInfo.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/VersionInfo.java b/core/src/flex/messaging/VersionInfo.java new file mode 100644 index 0000000..0d60e4d --- /dev/null +++ b/core/src/flex/messaging/VersionInfo.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging; + +import flex.messaging.util.StringUtils; + + +/** + * Class representing the build version of Data Services. + */ +public class VersionInfo +{ + //Cache this info as it should not change during the time class is loaded + public static String BUILD_MESSAGE; + public static String BUILD_NUMBER_STRING; + public static String BUILD_TITLE; + public static long BUILD_NUMBER; + + private static final String LCDS_CLASS = "flex.data.DataService"; + + public static String buildMessage() + { + if (BUILD_MESSAGE == null) + { + try + { + //Ensure we've parsed build info + getBuild(); + + if (StringUtils.isEmpty(BUILD_NUMBER_STRING)) + { + BUILD_MESSAGE = BUILD_TITLE; + } + else + { + BUILD_MESSAGE = BUILD_TITLE + ": " + BUILD_NUMBER_STRING; + } + } + catch (Throwable t) + { + BUILD_MESSAGE = BUILD_TITLE +": information unavailable"; + } + } + + return BUILD_MESSAGE; + } + + public static long getBuildAsLong() + { + if (BUILD_NUMBER == 0) + { + getBuild(); + + if (BUILD_NUMBER_STRING != null && !BUILD_NUMBER_STRING.equals("")) + { + try + { + BUILD_NUMBER = Long.parseLong(BUILD_NUMBER_STRING); + } + catch (NumberFormatException nfe) + { + // ignore, just return 0 + } + } + } + + return BUILD_NUMBER; + } + + public static String getBuild() + { + if (BUILD_NUMBER_STRING == null) + { + Class classToUseForManifest; + + try + { + classToUseForManifest = Class.forName(LCDS_CLASS); + } + catch (ClassNotFoundException e) + { + classToUseForManifest = VersionInfo.class; + } + + try + { + BUILD_NUMBER_STRING = ""; + Package pack = classToUseForManifest.getPackage(); + BUILD_NUMBER_STRING = pack.getImplementationVersion(); + BUILD_TITLE = pack.getImplementationTitle(); + } + catch (Throwable t) + { + // ignore, just return empty string + } + } + + return BUILD_NUMBER_STRING; + } +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/AsyncPollHandler.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/AsyncPollHandler.java b/core/src/flex/messaging/client/AsyncPollHandler.java new file mode 100644 index 0000000..2817419 --- /dev/null +++ b/core/src/flex/messaging/client/AsyncPollHandler.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +/** + * Defines the interface to handle asynchronous poll results. + */ +public interface AsyncPollHandler +{ + /** + * Invoked by the <tt>FlexClient</tt> when an asynchronous poll result is available. + * + * @param flushResult The flush result containing messages to return in the poll response and + * an optional wait time before the client should issue its next poll. + */ + void asyncPollComplete(FlushResult flushResult); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/EndpointPushHandler.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/EndpointPushHandler.java b/core/src/flex/messaging/client/EndpointPushHandler.java new file mode 100644 index 0000000..8e9c19e --- /dev/null +++ b/core/src/flex/messaging/client/EndpointPushHandler.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import java.util.List; + +import flex.messaging.MessageClient; +import flex.messaging.messages.Message; + +/** + * Defines the interface for a handler that may be registered by an endpoint with a <tt>FlexClient</tt> in order + * to push messages to a connected client. + */ +public interface EndpointPushHandler +{ + /** + * Invoked to shut down the handler. + * It may be invoked by the endpoint when the underlying connection it manages to the client closes, + * or by the <tt>FlexClient</tt> if it is invalidated. + * The implementation of this method should release any resources, and should not attempt to notify the + * client of an explicit disconnect. + * + * @see #close(boolean) + */ + void close(); + + /** + * Invoked to shut down the handler. + * It may be invoked by the endpoint when the underlying connection it manages to the client closes, + * or by the <tt>FlexClient</tt> if it is invalidated. + * The implementation of this method should release any resources, and may attempt to notify the client + * Channel that it has been disconnected in order to suppress automatic reconnect behavior. + * + * @param disconnectChannel True to attempt to notify the client of an explicit disconnect in order to + * suppress automatic reconnect behavior. + */ + void close(boolean disconnectChannel); + + /** + * Invoked by the <tt>FlexClient</tt> when it has messages to push to + * the client. + * + * @param messagesToPush The list of messages to push. + */ + void pushMessages(List<Message> messagesToPush); + + /** + * Invoked to notify the handler that the <tt>MessageClient</tt> subscription is using this handler. + * If subscriptions should be invalidated if the handler is closed, it should retain references to + * all registered <tt>MessageClient</tt> instances and invalidate them when it closes. + * + * @param messageClient The <tt>MessageClient</tt> subscription using this handler. + */ + void registerMessageClient(MessageClient messageClient); + + /** + * Invoked to notify the handler that a <tt>MessageClient</tt> subscription that was using it has + * been invalidated. + * If the handler is tracking the set of <tt>MessageClient</tt> instances that are using it, the handler should + * remove the instance from its set. + * + * @param messageClient The <tt>MessageClient</tt> subscription no longer using this handler. + */ + void unregisterMessageClient(MessageClient messageClient); +} http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/EndpointPushNotifier.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/EndpointPushNotifier.java b/core/src/flex/messaging/client/EndpointPushNotifier.java new file mode 100644 index 0000000..cd6db61 --- /dev/null +++ b/core/src/flex/messaging/client/EndpointPushNotifier.java @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package flex.messaging.client; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import flex.messaging.FlexContext; +import flex.messaging.FlexSession; +import flex.messaging.FlexSessionListener; +import flex.messaging.MessageClient; +import flex.messaging.MessageClientListener; +import flex.messaging.endpoints.BaseStreamingHTTPEndpoint; +import flex.messaging.endpoints.Endpoint; +import flex.messaging.log.Log; +import flex.messaging.log.LogCategories; +import flex.messaging.messages.AsyncMessage; +import flex.messaging.messages.CommandMessage; +import flex.messaging.util.TimeoutAbstractObject; +import flex.messaging.util.UUIDUtils; + +/** + * + * Instances of this class are used by endpoints that support streaming + * outbound data to connected clients when the client is not polling and + * the FlexSession representing the connection does not support push directly. + * This generally requires that the client and endpoint establish a separate, + * physical connection for pushed data that is part of a larger, logical + * connection/session. + * <p> + * When the endpoint establishes this physical streaming connection it will + * create an instance of this class, register it with the FlexClient and then + * wait on the public <code>pushNeeded</code> condition variable. + * When data arrives to push to the remote client, the FlexClient will queue it + * with this notifier instance and the waiting endpoint will be notified. + * The endpoint will retrieve the queued messages from the notifier instance and will + * stream them to the client and then go back into a wait state on the + * <code>pushNeeded</code> condition variable. + * </p><p> + * Note that this implementation is based upon <code>Object.wait()</code>; it is not a + * non-blocking implementation. + * </p> + */ +public class EndpointPushNotifier extends TimeoutAbstractObject implements EndpointPushHandler, FlexSessionListener, MessageClientListener +{ + //-------------------------------------------------------------------------- + // + // Constructor + // + //-------------------------------------------------------------------------- + + /** + * Constructs a PushNotifier instance. + * + * @param endpoint The endpoint that will use this notifier. + * @param flexClient The FlexClient that will use this notifier. + */ + public EndpointPushNotifier(Endpoint endpoint, FlexClient flexClient) + { + notifierId = UUIDUtils.createUUID(false /* doesn't need to be secure */); + this.endpoint = endpoint; + this.flexClient = flexClient; + flexClient.registerEndpointPushHandler(this, endpoint.getId()); + flexSession = FlexContext.getFlexSession(); + if (flexSession != null) + flexSession.addSessionDestroyedListener(this); + invalidateMessageClientOnStreamingClose = (endpoint instanceof BaseStreamingHTTPEndpoint)? + ((BaseStreamingHTTPEndpoint)endpoint).isInvalidateMessageClientOnStreamingClose() : false; + updateLastUse(); // Initialize last use timestamp to construct time. + } + + //-------------------------------------------------------------------------- + // + // Public Variables + // + //-------------------------------------------------------------------------- + + /** + * The condition variable that the endpoint waits on for pushed data to arrive. + */ + public final Object pushNeeded = new Object(); + + //-------------------------------------------------------------------------- + // + // Private Variables + // + //-------------------------------------------------------------------------- + + /** + * Flag indicating whether the notifier has been closed/shut down. + * This is used to signal a waiting endpoint that it should break out of its + * wait loop and close its streaming connection. + */ + private volatile boolean closed; + + /** + * Flag indicating that the notifier has started closing; used to allow only + * one thread to execute the close() logic and delay flipping closed to true + * to allow any final messages to be streamed to the client before the endpoint + * using the notifier breaks out of its wait/notify loop and terminates the + * streaming connection. + */ + private volatile boolean closing; + + /** + * The number of minutes a client can remain idle before the server + * times the notifier out. + */ + private int idleTimeoutMinutes; + + /** + * Whether to invalidate the message-client when the streaming connection is closed. + */ + private final boolean invalidateMessageClientOnStreamingClose; + + /** + * The endpoint that uses this notifier. + */ + private final Endpoint endpoint; + + /** + * The FlexClient this notifier is associated with. + */ + private final FlexClient flexClient; + + /** + * The FlexSession this notifier is associated with. + */ + private final FlexSession flexSession; + + /** + * Lock for instance-level synchronization. + */ + private final Object lock = new Object(); + + /** + * Log category used by the notifier. Initialized to ENDPOINT_GENERAL but + * endpoints using this notifier should set it to their own log categories. + */ + private String logCategory = LogCategories.ENDPOINT_GENERAL; + + /** + * Queue of messages that the FlexClient will populate and the endpoint will drain to + * stream to the client. + */ + private List<AsyncMessage> messages; + + /** + * List of MessageClient subscriptions using this endpoint push notifier. + * When this notifier is closed, any associated subscriptions need to be invalidated. + */ + private final CopyOnWriteArrayList<MessageClient> messageClients = new CopyOnWriteArrayList<MessageClient>(); + + /** + * Unique identifier for this instance. + */ + private final String notifierId; + + //-------------------------------------------------------------------------- + // + // Public Methods + // + //-------------------------------------------------------------------------- + + /** + * Moves this notifier to a closed state, notifying any listeners, + * associated subscriptions and waiting endpoints. + * Does not attempt to notify the client Channel of the disconnect thereby allowing + * automatic reconnect processing to run. + */ + public void close() + { + close(false); + } + + /** + * Moves this notifier to a closed state, notifying any listeners, + * associated subscriptions and waiting endpoints. + * Attempts to notify the client Channel of an explicit disconnect, thereby suppressing + * automatic reconnect processing. + * + * @param disconnectChannel True to attempt to notify the client Channel of the disconnect + * and suppress automatic reconnect processing. + */ + public void close(boolean disconnectChannel) + { + synchronized (lock) + { + if (closed || closing) + return; + + closing = true; + } + + cancelTimeout(); + + if (flexSession != null) + flexSession.removeSessionDestroyedListener(this); + + // Shut down flow of further messages to this notifier. + flexClient.unregisterEndpointPushHandler(this, endpoint.getId()); + + // Push a disconnect command down to the client to suppress automatic reconnect. + if (disconnectChannel) + { + ArrayList<AsyncMessage> list = new ArrayList<AsyncMessage>(1); + CommandMessage disconnect = new CommandMessage(CommandMessage.DISCONNECT_OPERATION); + list.add(disconnect); + pushMessages(list); + } + + // Invalidate associated subscriptions; this doesn't attempt to notify the client. + // Any client subscriptions made over this endpoint will be automatically invalidated + // on the client when it receives its channel disconnect event. + if (invalidateMessageClientOnStreamingClose) + { + for (Iterator<MessageClient> iter = messageClients.iterator() ; iter.hasNext();) + iter.next().invalidate(); + } + + // Move to final closed state; after this we need to notify one last time to stream + // any final messages to the client and allow the endpoint to shut down its streaming + // connection. + synchronized (lock) + { + closed = true; + closing = false; + } + synchronized (pushNeeded) + { + pushNeeded.notifyAll(); + } + } + + /** + * Returns any messages available to push to the client, and removes them + * from this notifier. + * Notified endpoints should invoke this method to retrieve messages, stream them + * to the client and then re-enter the wait state. + * This method acquires a lock on <code>pushNeeded</code>. + * + * @return The messages to push to the client. + */ + public List<AsyncMessage> drainMessages() + { + synchronized (pushNeeded) + { + List<AsyncMessage> messagesToPush = messages; + messages = null; + return messagesToPush; + } + } + + /** + * Returns whether the notifier has closed; used to break the endpoint's wait cycle. + * + * @return True if the notifier has closed; otherwise false. + */ + public boolean isClosed() + { + return closed; + } + + /** + * Returns the endpoint that is using this notifier. + * + * @return The endpoint using this notifier. + */ + public Endpoint getEndpoint() + { + return endpoint; + } + + /** + * Returns the idle timeout minutes used by the notifier. + * + * @return The idle timeout minutes used by the notifier. + */ + public int getIdleTimeoutMinutes() + { + return idleTimeoutMinutes; + } + + /** + * Sets the idle timeout minutes used by the notifier. + * + * @param idleTimeoutMinutes The idle timeout minutes used by the notifier. + */ + public void setIdleTimeoutMinutes(int idleTimeoutMinutes) + { + this.idleTimeoutMinutes = idleTimeoutMinutes; + } + + /** + * Returns the log category used by this notifier. + * + * @return The log category used by this notifier. + */ + public String getLogCategory() + { + return logCategory; + } + + /** + * Sets the log category used by this notifier. Endpoints using this notifier + * should set it to their own log categories. + * + * @param logCategory The log category for the notifier to use. + */ + public void setLogCategory(String logCategory) + { + this.logCategory = logCategory; + } + + /** + * Returns the unique id for this notifier. + * + * @return The unique id for this notifier. + */ + public String getNotifierId() + { + return notifierId; + } + + /** + * + * Implements TimeoutCapable. + * Determine the time, in milliseconds, that this object is allowed to idle + * before having its timeout method invoked. + */ + public long getTimeoutPeriod() + { + return (idleTimeoutMinutes * 60 * 1000); + } + + /** + * + */ + public void messageClientCreated(MessageClient messageClient) + { + // No-op. + } + + /** + * + */ + public void messageClientDestroyed(MessageClient messageClient) + { + unregisterMessageClient(messageClient); + } + + /** + * Used by FlexClient to push messages to the endpoint. + * This method will automatically notify a waiting endpoint, if one exists + * and it acquires a lock on <code>pushNeeded</code>. + * + * @param messages The messages to push to the client. + */ + public void pushMessages(List messagesToPush) + { + if (!messagesToPush.isEmpty()) + { + synchronized (pushNeeded) + { + // Push these straight on through; notify immediately. + if (messages == null) + messages = messagesToPush; + else + messages.addAll(messagesToPush); + + // If the notifier isn't closing, notify; otherwise just add and the close will + // notify once it completes. + if (!closing) + pushNeeded.notifyAll(); + } + } + } + + /** + * Registers a MessageClient subscription that depends on this notifier. + * + * @param messageClient A MessageClient that depends on this notifier. + */ + public void registerMessageClient(MessageClient messageClient) + { + if (messageClient != null) + { + if (messageClients.addIfAbsent(messageClient)) + messageClient.addMessageClientDestroyedListener(this); + } + } + + /** + * Handle session creation events. This handler is a no-op because the notifier + * is only concerned with its associated session's destruction. + * + * @param flexSession The newly created FlexSession. + */ + public void sessionCreated(FlexSession flexSession) + { + // No-op. + } + + /** + * Handle session destruction events. This will be invoked when the notifier's + * associated session is invalidated, and this will trigger the notifier to close. + * + * @param flexSession The FlexSession being invalidated. + */ + public void sessionDestroyed(FlexSession flexSession) + { + if (Log.isInfo()) + Log.getLogger(logCategory).info("Endpoint with id '" + endpoint.getId() + "' is closing" + + " a streaming connection for the FlexClient with id '" + flexClient.getId() + "'" + + " since its associated session has been destroyed."); + close(true /* disconnect client Channel */); + } + + /** + * + * Implements TimeoutCapable. + * Inform the object that it has timed out. + */ + public void timeout() + { + if (Log.isInfo()) + Log.getLogger(logCategory).info("Endpoint with id '" + endpoint.getId() + "' is timing out" + + " a streaming connection for the FlexClient with id '" + flexClient.getId() + "'"); + close(true /* disconnect client Channel */); + } + + /** + * Unregisters a MessageClient subscription that depended on this notifier. + * + * @param messageClient A MessageClient that depended on this notifier. + */ + public void unregisterMessageClient(MessageClient messageClient) + { + if (messageClient != null) + { + messageClient.removeMessageClientDestroyedListener(this); + messageClients.remove(messageClient); + } + } +} \ No newline at end of file
