bloritsch 2003/02/10 20:40:27 Modified: event build.xml default.properties event/src/java/org/apache/excalibur/event Queue.java Sink.java event/src/java/org/apache/excalibur/event/impl AbstractQueue.java DefaultQueue.java FixedSizeQueue.java event/src/test/org/apache/excalibur/event/test AbstractQueueTestCase.java FixedSizeQueueTestCase.java Added: . check-targets.ent check-targets.properties event/src/java/org/apache/excalibur/event DequeueInterceptor.java EnqueuePredicate.java event/src/java/org/apache/excalibur/event/impl LossyMultiCastSink.java MultiCastSink.java NullDequeueInterceptor.java NullEnqueuePredicate.java RateLimitingPredicate.java ThresholdEnqueuePredicate.java Removed: event/src/silk/java/org/apache/excalibur/event/ext AbstractQueue.java AbstractSimpleSink.java AbstractSink.java DefaultDequeueInterceptor.java DefaultPredicate.java DefaultQueue.java DequeueInterceptor.java DequeueInterceptorSource.java EnqueuePredicate.java EnqueuePredicateSink.java FixedSizeQueue.java LossyMultiCastSink.java MultiCastSink.java RateLimitingPredicate.java ThresholdPredicate.java Log: merge important parts of silk proposal into event, the rest will go in the SEDA project Revision Changes Path 1.1 avalon-excalibur/check-targets.ent Index: check-targets.ent =================================================================== <!-- This build.xml snippet contains targets for ant 1.5 that check whether a specific libary is present, and offers targets for downloading them if they're not. It's kinda of a replacement for some maven functionality until maven becomes stable. Usage: 1) Copy this file to somewhere in your project. 2) Add the following to the top of your project's Ant build.xml script (adjusting the path): <!DOCTYPE project [ <!ENTITY check-targets SYSTEM "file:./check-targets.ent"> ]> 3) Before the closing '</project>' in your build.xml, add this: &check-targets; 4) define the properties - lib.dir - project.class.path - Name This is like expanding a macro: it pulls in the contents of this file. A minimal build.xml would thus be: <!DOCTYPE project [ <!ENTITY check-targets SYSTEM "file:./check-targets.ent"> ]> <project default="check-all-dependencies"> <property name="Name" value="My Project"/> <property name="lib.dir" value="lib"/> <path id="project.class.path"> <fileset dir="${lib.dir}"> <include name="*.jar" /> </fileset> </path> &check-targets; </project> --> <!-- Set up properties --> <property file="check-targets.properties"/> <property file="../check-targets.properties"/> <target name="setup-dependencies"> <available property="bsf.present" classname="com.ibm.bsf.BSFEngine" classpathref="project.class.path" /> <available property="checkstyle.present" classname="com.puppycrawl.tools.checkstyle.Checker" classpathref="project.class.path" /> <available property="commons-logging.present" classname="org.apache.comons.logging.Log" classpathref="project.class.path" /> <available property="javax.jms.present" classname="javax.jms.TopicConnection" classpathref="project.class.path" /> <available property="javax.mail.present" classname="javax.mail.Message" classpathref="project.class.path" /> <available property="javax.servlet.present" classname="javax.servlet.ServletContext" classpathref="project.class.path" /> <available property="javax.sql.present" classname="javax.sql.DataSource" classpathref="project.class.path" /> <available property="jdk14.present" classname="java.util.logging.Logger" classpathref="project.class.path" /> <available property="junit.present" classname="junit.framework.Test" classpathref="project.class.path" /> <available property="log4j.present" classname="org.apache.log4j.Level" classpathref="project.class.path" /> <available property="logkit.present" classname="org.apache.log.Hierarchy" classpathref="project.class.path" /> <available property="rhino.present" classname="org.mozilla.javascript.Parser" classpathref="project.class.path"/> </target> <!-- called if a dependency is missing which has a BSD or ASL or similar license for which autodownload is okay. --> <target name="missing-dependency"> <echo> ************************************************************************* Classes needed for compiling ${Name} against the ${id} API are not available. The build may fail or some functionality may not be available. Recovery: Run the build target import-${id} and it will download the needed jar - you should be online for that. If you have a fast connection you can also just run the build target import-all-auto-dependencies, which will download all external jars this buildfile knows about. Note this may download unused jars as well. ************************************************************************* </echo> <echo/> </target> <!-- called if a dependency is missing which we cannot autodownload due to licensing--> <target name="missing-dependency-noauto"> <echo> ************************************************************************* Classes needed for compiling ${Name} against the ${id} API are not available. The build may fail or some functionality may not be available. Recovery: Get the ${id} jar from the ${id} distribution at ${dist.url} and place it in ${lib.dir}. Please note that ${id} is licensed under the ${license} and that by downloading it you are agreeing to that license. You can read this license at ${license.url} ************************************************************************* </echo> <echo/> </target> <!-- this target fetches a file from a maven repository --> <target name="import-dependency"> <get src="${license.url}" dest="${lib.dir}/${id}.LICENSE.html" usetimestamp="true"/> <loadfile property="license.text" srcFile="${lib.dir}/${id}.LICENSE.html"/> <echo> ************************************************************************* You have requested to download the ${id} jar, which is licensed under the ${license}. A copy of this license has been saved to ${lib.dir}/${id}.LICENSE.html Please view it now. ************************************************************************* </echo> <input message="Do you agree to the terms of this license?" validargs="y,n" addproperty="do.download"/> <condition property="do.abort"> <equals arg1="n" arg2="${do.download}"/> </condition> <fail if="do.abort">Download aborted.</fail> <get src="${jar.repository}/${id}/jars/${jar.id}-${version}.jar" dest="${lib.dir}/${id}-${version}.jar" verbose="true" usetimestamp="true"/> </target> <target name="check-all-dependencies" depends="bsf-check,checkstyle-check,javamail-check,jdbc-check,jms-check,junit-check,log4j-check,logkit-check,servletapi-check,commons-logging-check" description="checks for all dependencies we know about. This is usually more than the minimum needed to build this project; actual use is not recommended."/> <target name="import-all-auto-dependencies" depends="import-bsf,import-checkstyle,import-junit,import-log4j,import-logkit,import-commons-logging" description="checks for all dependencies we know about. This is usually more than the minimum needed to build this project; actual use is not recommended."/> <!-- BSF --> <target name="bsf-check" unless="bsf.present" depends="setup-dependencies"> <antcall target="missing-dependency"> <param name="id" value="${bsf.id}"/> <param name="jar.id" value="${bsf.id}"/> <param name="version" value="${bsf.version}"/> <param name="license" value="${bsf.license}"/> <param name="license.url" value="${bsf.license.url}"/> </antcall> </target> <target name="import-bsf" depends="setup-dependencies"> <antcall target="import-dependency"> <param name="id" value="${bsf.id}"/> <param name="jar.id" value="${bsf.id}"/> <param name="version" value="${bsf.version}"/> <param name="license" value="${bsf.license}"/> <param name="license.url" value="${bsf.license.url}"/> </antcall> </target> <!-- Checkstyle --> <target name="checkstyle-check" unless="checkstyle.present" depends="setup-dependencies"> <antcall target="missing-dependency"> <param name="id" value="${checkstyle.id}"/> <param name="jar.id" value="${checkstyle.id}"/> <param name="version" value="${checkstyle.version}"/> <param name="license" value="${checkstyle.license}"/> <param name="license.url" value="${checkstyle.license.url}"/> </antcall> </target> <target name="import-checkstyle" depends="setup-dependencies"> <antcall target="import-dependency"> <param name="id" value="${checkstyle.id}"/> <param name="jar.id" value="${checkstyle.id}"/> <param name="version" value="${checkstyle.version}"/> <param name="license" value="${checkstyle.license}"/> <param name="license.url" value="${checkstyle.license.url}"/> </antcall> </target> <!-- Commons-Logging --> <target name="commons-logging-check" unless="commons-logging.present" depends="setup-dependencies"> <antcall target="missing-dependency"> <param name="id" value="${commons-logging.id}"/> <param name="jar.id" value="${commons-logging.id}"/> <param name="version" value="${commons-logging.version}"/> <param name="license" value="${commons-logging.license}"/> <param name="license.url" value="${commons-logging.license.url}"/> </antcall> </target> <target name="import-commons-logging" depends="setup-dependencies"> <antcall target="import-dependency"> <param name="id" value="${commons-logging.id}"/> <param name="jar.id" value="${commons-logging.id}"/> <param name="version" value="${commons-logging.version}"/> <param name="license" value="${commons-logging.license}"/> <param name="license.url" value="${commons-logging.license.url}"/> </antcall> </target> <!-- JavaMail --> <target name="javamail-check" unless="javax.mail.present" depends="setup-dependencies"> <antcall target="missing-dependency-noauto"> <param name="id" value="${javamail.id}"/> <param name="jar.id" value="${javamail.id}"/> <param name="version" value="${javamail.version}"/> <param name="license" value="${javamail.license}"/> <param name="license.url" value="${javamail.license.url}"/> <param name="dist.url" value="${javamail.dist.url}"/> </antcall> </target> <!-- JDBC --> <target name="jdbc-check" unless="javax.sql.present" depends="setup-dependencies"> <antcall target="missing-dependency-noauto"> <param name="id" value="${jdbc.id}"/> <param name="jar.id" value="${jdbc.id}"/> <param name="version" value="${jl.version}"/> <param name="license" value="${jdbc.license}"/> <param name="license.url" value="${jdbc.license.url}"/> <param name="dist.url" value="${jdbc.dist.url}"/> </antcall> </target> <!-- JDK 1.4 --> <target name="jdk14-check" unless="jdk14.present" depends="setup-dependencies"> <echo>*************************************************************************</echo> <echo>*</echo> <echo>* You are compiling with a pre-1.4 jdk. Some functionality may not be</echo> <echo>* available.</echo> <echo>*************************************************************************</echo> <echo/> </target> <!-- JMS --> <target name="jms-check" unless="javax.jms.present" depends="setup-dependencies"> <antcall target="missing-dependency-noauto"> <param name="id" value="${jms.id}"/> <param name="jar.id" value="${jms.id}"/> <param name="version" value="${jms.version}"/> <param name="license" value="${jms.license}"/> <param name="license.url" value="${jms.license.url}"/> <param name="dist.url" value="${jms.dist.url}"/> </antcall> </target> <!-- JUnit --> <target name="junit-check" unless="junit.present" depends="setup-dependencies"> <antcall target="missing-dependency"> <param name="id" value="${junit.id}"/> <param name="jar.id" value="${junit.id}"/> <param name="version" value="${junit.version}"/> <param name="license" value="${junit.license}"/> <param name="license.url" value="${junit.license.url}"/> </antcall> </target> <target name="import-junit" depends="setup-dependencies"> <antcall target="import-dependency"> <param name="id" value="${junit.id}"/> <param name="jar.id" value="${junit.id}"/> <param name="version" value="${junit.version}"/> <param name="license" value="${junit.license}"/> <param name="license.url" value="${junit.license.url}"/> </antcall> </target> <!-- Log4J --> <target name="log4j-check" unless="log4j.present" depends="setup-dependencies"> <antcall target="missing-dependency"> <param name="id" value="${log4j.id}"/> <param name="jar.id" value="${log4j.id}"/> <param name="version" value="${log4j.version}"/> <param name="license" value="${log4j.license}"/> <param name="license.url" value="${log4j.license.url}"/> </antcall> </target> <target name="import-log4j" depends="setup-dependencies"> <antcall target="import-dependency"> <param name="id" value="${log4j.id}"/> <param name="jar.id" value="${log4j.id}"/> <param name="version" value="${log4j.version}"/> <param name="license" value="${log4j.license}"/> <param name="license.url" value="${log4j.license.url}"/> </antcall> </target> <!-- Logkit --> <target name="logkit-check" unless="logkit.present" depends="setup-dependencies"> <antcall target="missing-dependency"> <param name="id" value="${logkit.id}"/> <param name="jar.id" value="${logit.id}"/> <param name="version" value="${logkit.version}"/> <param name="license" value="${logkit.license}"/> <param name="license.url" value="${logkit.license.url}"/> </antcall> </target> <target name="import-logkit" depends="setup-dependencies"> <antcall target="import-dependency"> <param name="id" value="${logkit.id}"/> <param name="jar.id" value="${logkit.id}"/> <param name="version" value="${logkit.version}"/> <param name="license" value="${logkit.license}"/> <param name="license.url" value="${logkit.license.url}"/> </antcall> </target> <!-- Rhino --> <target name="rhino-check" unless="rhino.present" depends="setup-dependencies"> <antcall target="missing-dependency"> <param name="id" value="${rhino.id}"/> <param name="jar.id" value="${rhino.jar.id}"/> <param name="version" value="${rhino.version}"/> <param name="license" value="${rhino.license}"/> <param name="license.url" value="${rhino.license.url}"/> </antcall> </target> <target name="import-rhino" depends="setup-dependencies"> <antcall target="import-dependency"> <param name="id" value="${rhino.id}"/> <param name="jar.id" value="${rhino.jar.id}"/> <param name="version" value="${rhino.version}"/> <param name="license" value="${rhino.license}"/> <param name="license.url" value="${rhino.license.url}"/> </antcall> </target> <!-- Servlet API --> <target name="servletapi-check" unless="javax.servlet.present" depends="setup-dependencies"> <antcall target="missing-dependency-noauto"> <param name="id" value="${servletapi.id}"/> <param name="jar.id" value="${servletapi.id}"/> <param name="version" value="${servletapi.version}"/> <param name="license" value="${servletapi.license}"/> <param name="license.url" value="${servletapi.license.url}"/> <param name="dist.url" value="${servletapi.dist.url}"/> </antcall> </target> 1.1 avalon-excalibur/check-targets.properties Index: check-targets.properties =================================================================== # sets up properties for the dependency download mechanism jar.repository=http://www.ibiblio.org/maven ibm.license=IBM Public License ibm.license.url=http://www.opensource.org/licenses/ibmpl.php cpl.license=Common Public License cpl.license.url=http://www.opensource.org/licenses/cpl.php asl.license=Apache Software License asl.license.url=http://www.opensource.org/licenses/apachepl.php lgpl.license=GNU Lesser General Public License lgpl.license.url=http://www.opensource.org/licenses/lgpl-license.php sbcl.license=Sun Binary Code License npl.license=Netzcape Public License npl.license.url=http://www.mozilla.org/MPL/NPL-1.1.html # BSF bsf.version=2.2 bsf.id=bsf bsf.license=${ibm.license} bsf.license.url=${ibm.license.url} bsf.dist.url=http://www-124.ibm.com/developerworks/projects/bsf # Checkstyle checkstyle.version=2.4 checkstyle.id=checkstyle checkstyle.license=${lgpl.license} checkstyle.license.url=${lgpl.license.url} checkstyle.dist.url=http://checkstyle.sf.net/ # Commons-Logging commons-logging.version=1.0.2 commons-logging.id=commons-logging commons-logging.license=${asl.license} commons-logging.license.url=${asl.license.url} commons-logging.dist.url=http://jakarta.apache.org/commons/logging/ # JavaMail javamail.version=1.2 javamail.id=javamail javamail.license=${sbcl.license} javamail.license.url=http://java.sun.com/products/javamail/ javamail.dist.url=http://java.sun.com/products/javamail/ # JDBC jdbc.version=2.0 jdbc.id=jdbc jdbc.license=${sbcl.license} jdbc.license.url=http://java.sun.com/products/jdbc/ jdbc.dist.url=http://java.sun.com/products/jdbc/ # JMS jms.version=1.0.2 jms.id=jms jms.license=${sbcl.license} jms.license.url=http://java.sun.com/products/jms/ jms.dist.url=http://java.sun.com/products/jms/ # JUnit junit.version=3.7 junit.id=junit junit.license=${cpl.license} junit.license.url=${cpl.license.url} junit.dist.url=http://www.junit.org/ # Log4J log4j.version=1.2.7 log4j.id=log4j log4j.license=${asl.license} log4j.license.url=${asl.license.url} log4j.dist.url=http://jakarta.apache.org/log4j/ # Logkit logkit.version=1.0.1 logkit.id=logkit logkit.license=${asl.license} logkit.license.url=${asl.license.url} logkit.dist.url=http://avalon.apache.org/logkit/ # Rhino rhino.version=1.5R4-RC3 rhino.id=rhino rhino.jar.id=js rhino.license=${npl.license} rhino.license.url=${npl.license.url} rhino.dist.urlftp:ftp://ftp.mozilla.org/pub/js/ # Servlet API servletapi.version=2.2 servletapi.id=servletapi servletapi.license=${sbcl.license} servletapi.license.url=http://java.sun.com/products/servlet/ servletapi.dist.url=http://java.sun.com/products/servlet/ 1.43 +11 -0 avalon-excalibur/event/build.xml Index: build.xml =================================================================== RCS file: /home/cvs/avalon-excalibur/event/build.xml,v retrieving revision 1.42 retrieving revision 1.43 diff -u -r1.42 -r1.43 --- build.xml 29 Jan 2003 14:07:39 -0000 1.42 +++ build.xml 11 Feb 2003 04:40:26 -0000 1.43 @@ -1,5 +1,9 @@ <?xml version="1.0"?> +<!DOCTYPE project [ + <!ENTITY check-targets SYSTEM "file:../check-targets.ent"> +]> + <project name="Excalibur Event" default="main" basedir="."> <property file="${user.home}/build.properties"/> @@ -18,6 +22,9 @@ <pathelement location="${avalon-framework.jar}"/> <pathelement location="${checkstyle.jar}"/> <pathelement path="${java.class.path}"/> + <fileset dir="${lib.dir}"> + <include name="*.jar" /> + </fileset> </path> <path id="tools.class.path"> @@ -38,6 +45,9 @@ </path> <property name="cp" refid="test.class.path"/> + <target name="check-dependencies" depends="junit-check"/> + <target name="import-dependencies" depends="import-junit"/> + <target name="main" depends="jar" description="Build the project"/> <target name="rebuild" depends="clean,main" description="Rebuild the project"/> @@ -391,4 +401,5 @@ <delete dir="${dist.base}" /> </target> + &check-targets; </project> 1.36 +2 -0 avalon-excalibur/event/default.properties Index: default.properties =================================================================== RCS file: /home/cvs/avalon-excalibur/event/default.properties,v retrieving revision 1.35 retrieving revision 1.36 diff -u -r1.35 -r1.36 --- default.properties 6 Feb 2003 13:38:30 -0000 1.35 +++ default.properties 11 Feb 2003 04:40:26 -0000 1.36 @@ -112,4 +112,6 @@ # property indicating directory where all distribution archives are placed dist.base = distributions +lib.dir = lib + depchecker.prefix=. 1.6 +47 -0 avalon-excalibur/event/src/java/org/apache/excalibur/event/Queue.java Index: Queue.java =================================================================== RCS file: /home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/Queue.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- Queue.java 10 Nov 2002 03:39:19 -0000 1.5 +++ Queue.java 11 Feb 2003 04:40:26 -0000 1.6 @@ -61,8 +61,55 @@ * </p> * * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a> + * @author <a href="mailto:[EMAIL PROTECTED]">schierma</a> */ public interface Queue extends Source, Sink { String ROLE = Queue.class.getName(); + + /** + * Set the enqueue predicate for this sink. This mechanism + * allows user to define a method that will 'screen' + * QueueElementIF's during the enqueue procedure to either + * accept or reject them. The enqueue predicate runs in the + * context of the caller of [EMAIL PROTECTED] #enqueue(Object)}, + * which means it must be simple and fast. This can be used + * to implement many interesting m_sink-thresholding policies, + * such as simple count threshold, credit-based mechanisms, + * and more. + * @since Feb 10, 2003 + * + * @param enqueuePredicate + * the enqueue predicate for this sink + */ + public void setEnqueuePredicate(EnqueuePredicate enqueuePredicate); + + /** + * Return the enqueue predicate for this sink. + * @since Feb 10, 2003 + * + * @return [EMAIL PROTECTED] EnqueuePredicate} + * the enqueue predicate for this sink. + */ + public EnqueuePredicate getEnqueuePredicate(); + + /** + * Set the dequeue executable for this sink. This mechanism + * allows users to define a methods that will be executed + * before or after dequeuing elements from a source + * @since Feb 10, 2003 + * + * @param executable + * The dequeue executable for this sink. + */ + public void setDequeueInterceptor(DequeueInterceptor executable); + + /** + * Return the dequeue executable for this sink. + * @since Feb 10, 2003 + * + * @return [EMAIL PROTECTED] DequeueInterceptor} + * The dequeue executable for this sink. + */ + public DequeueInterceptor getDequeueInterceptor(); } 1.16 +13 -0 avalon-excalibur/event/src/java/org/apache/excalibur/event/Sink.java Index: Sink.java =================================================================== RCS file: /home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/Sink.java,v retrieving revision 1.15 retrieving revision 1.16 diff -u -r1.15 -r1.16 --- Sink.java 30 Sep 2002 17:49:13 -0000 1.15 +++ Sink.java 11 Feb 2003 04:40:26 -0000 1.16 @@ -155,6 +155,8 @@ * thresholding, and does not always accurately report maxSize(). * * @return -1 if the sink has no length threshold. + * + * @deprecated Use the EnqueuePredicate to control this instead. */ int maxSize(); @@ -166,6 +168,8 @@ * fail, since the Sink may be serviced in the meantime. * * @return true if the Sink is full + * + * @deprecated Use the EnqueuePredicate to control this instead */ boolean isFull(); @@ -175,11 +179,20 @@ * <code>maxSize()</code>. It will return -1 if the sink is unbounded. * * @return the number of elements the Sink can accept + * + * @deprecated Use the EnqueuePredicate to control this instead. */ int canAccept(); /** * Returns the number of elements waiting in this Sink. + * + * <p><span style="color: blue;"><i>Important:</i></span> + * The contract for this method was updated to account for any elements + * that were prepared for enqueueing. It provides a more predictable + * and consistent environment, as well as making it easier for + * EnqueuePredicates to account for those elements. + * </p> * * @return the number of elements in the Sink */ 1.1 avalon-excalibur/event/src/java/org/apache/excalibur/event/DequeueInterceptor.java Index: DequeueInterceptor.java =================================================================== /* ============================================================================ The Apache Software License, Version 1.1 ============================================================================ Copyright (C) @year@ The Apache Software Foundation. All rights reserved. Redistribution and use in source and binary forms, with or without modifica- tion, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software developed by the Apache Software Foundation (http://www.apache.org/)." Alternately, this acknowledgment may appear in the software itself, if and wherever such third-party acknowledgments normally appear. 4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software Foundation" must not be used to endorse or promote products derived from this software without prior written permission. For written permission, please contact [EMAIL PROTECTED] 5. Products derived from this software may not be called "Apache", nor may "Apache" appear in their name, without prior written permission of the Apache Software Foundation. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. This software consists of voluntary contributions made by many individuals on behalf of the Apache Software Foundation. For more information on the Apache Software Foundation, please see <http://www.apache.org/>. */ package org.apache.excalibur.event; /** * The dequeue executable interface describes operations that * are executed before and after elements are pulled from a * queue. * * @version $Revision: 1.1 $ * @author <a href="mailto:[EMAIL PROTECTED]">schierma</a> */ public interface DequeueInterceptor { /** * An operation executed before dequeing events from * the queue. The Source is passed in so the implementation * can determine to execute based on the queue properties. * * <p> * This method is called once at the beginning of any <code>dequeue</code> * method regardless of how many queue elements are dequeued. * </p> * * @since Feb 10, 2003 * * @param context The source from which the dequeue is performed. */ public void before(Source context); /** * An operation executed after dequeing events from * the queue. The Source is passed in so the implementation * can determine to execute based on the queue properties. * * <p> * This method is called once at the end of any <code>dequeue</code> * method regardless of how many queue elements are dequeued. * </p> * * @since Feb 10, 2003 * * @param context The source from which the dequeue is performed. */ public void after(Source context); } 1.1 avalon-excalibur/event/src/java/org/apache/excalibur/event/EnqueuePredicate.java Index: EnqueuePredicate.java =================================================================== /* ============================================================================ The Apache Software License, Version 1.1 ============================================================================ Copyright (C) @year@ The Apache Software Foundation. All rights reserved. Redistribution and use in source and binary forms, with or without modifica- tion, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software developed by the Apache Software Foundation (http://www.apache.org/)." Alternately, this acknowledgment may appear in the software itself, if and wherever such third-party acknowledgments normally appear. 4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software Foundation" must not be used to endorse or promote products derived from this software without prior written permission. For written permission, please contact [EMAIL PROTECTED] 5. Products derived from this software may not be called "Apache", nor may "Apache" appear in their name, without prior written permission of the Apache Software Foundation. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. This software consists of voluntary contributions made by many individuals on behalf of the Apache Software Foundation. For more information on the Apache Software Foundation, please see <http://www.apache.org/>. */ package org.apache.excalibur.event; import org.apache.excalibur.event.Sink; /** * Enqueue predicates allow users to specify a method that * will 'screen' elements being enqueued onto a sink, either * accepting or rejecting them. This mechanism can be used * to implement many interesting load-conditioning policies, * for example, simple thresholding, rate control, credit-based * flow control, and so forth. Note that the enqueue predicate * runs in the context of the <b>caller of enqueue()</b>, which * means it must be simple and fast. * * @version $Revision: 1.1 $ * @author <a href="mailto:[EMAIL PROTECTED]">schierma</a> */ public interface EnqueuePredicate { /** * Tests the given element for acceptance onto the m_sink. * @since Feb 10, 2003 * * @param element The element to enqueue * @param modifyingSing The sink that is used for this predicate * @return * <code>true</code> if the sink accepts the element; * <code>false</code> otherwise. */ boolean accept(Object element, Sink modifyingSink); /** * Tests the given element for acceptance onto the m_sink. * @since Feb 10, 2003 * * @param elements The array of elements to enqueue * @param modifyingSing The sink that is used for this predicate * @return * <code>true</code> if the sink accepts all the elements; * <code>false</code> otherwise. */ boolean accept(Object elements[], Sink modifyingSink); } 1.4 +50 -0 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/AbstractQueue.java Index: AbstractQueue.java =================================================================== RCS file: /home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/AbstractQueue.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- AbstractQueue.java 30 Sep 2002 16:17:01 -0000 1.3 +++ AbstractQueue.java 11 Feb 2003 04:40:26 -0000 1.4 @@ -50,6 +50,8 @@ package org.apache.excalibur.event.impl; import org.apache.excalibur.event.Queue; +import org.apache.excalibur.event.EnqueuePredicate; +import org.apache.excalibur.event.DequeueInterceptor; /** * Provides the base functionality for the other <code>Queue</code> types. @@ -63,6 +65,8 @@ protected final static Object[] EMPTY_ARRAY = new Object[ 0 ]; /** The number of milliseconds to wait */ protected long m_timeout = 0; + protected EnqueuePredicate m_predicate = new NullEnqueuePredicate(); + protected DequeueInterceptor m_interceptor = new NullDequeueInterceptor(); /** * Default for canAccept() @@ -140,5 +144,51 @@ } } } + } + + /** + * Set the EnqueuePredicate to limit entries into this Queue. + */ + public void setEnqueuePredicate( EnqueuePredicate predicate ) + { + if ( null == predicate ) throw new NullPointerException( "predicate" ); + + m_predicate = predicate; + } + + /** + * Return the EnqueuePredicate that is already set for this Queue. + */ + public EnqueuePredicate getEnqueuePredicate() + { + return m_predicate; + } + + /** + * Set the dequeue executable for this sink. This mechanism + * allows users to define a methods that will be executed + * before or after dequeuing elements from a source + * @since Sep 23, 2002 + * + * @param executable + * The dequeue executable for this sink. + */ + public void setDequeueInterceptor(DequeueInterceptor executable) + { + if ( null == executable ) throw new NullPointerException( "executable" ); + + m_interceptor = executable; + } + + /** + * Return the dequeue executable for this sink. + * @since Sep 23, 2002 + * + * @return [EMAIL PROTECTED] DequeueInterceptor} + * The dequeue executable for this sink. + */ + public DequeueInterceptor getDequeueInterceptor() + { + return m_interceptor; } } 1.6 +36 -27 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/DefaultQueue.java Index: DefaultQueue.java =================================================================== RCS file: /home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/DefaultQueue.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- DefaultQueue.java 2 Oct 2002 17:49:20 -0000 1.5 +++ DefaultQueue.java 11 Feb 2003 04:40:26 -0000 1.6 @@ -52,6 +52,7 @@ import org.apache.commons.collections.Buffer; import org.apache.commons.collections.UnboundedFifoBuffer; import EDU.oswego.cs.dl.util.concurrent.ReentrantLock; +import org.apache.excalibur.event.EnqueuePredicate; import org.apache.excalibur.event.PreparedEnqueue; import org.apache.excalibur.event.SinkException; import org.apache.excalibur.event.SinkFullException; @@ -81,22 +82,17 @@ */ public DefaultQueue( int size ) { - int maxSize; + this( new ThresholdEnqueuePredicate( size ) ); + } - if( size > 0 ) - { - m_elements = new UnboundedFifoBuffer( size ); - maxSize = size; - } - else - { - m_elements = new UnboundedFifoBuffer(); - maxSize = -1; - } + public DefaultQueue( EnqueuePredicate predicate ) + { + setEnqueuePredicate( predicate ); m_mutex = new ReentrantLock(); + m_elements = new UnboundedFifoBuffer(); m_reserve = 0; - m_maxSize = maxSize; + m_maxSize = -1; } /** @@ -104,17 +100,17 @@ */ public DefaultQueue() { - this( -1 ); + this( new NullEnqueuePredicate() ); } /** * Return the number of elements currently in the <code>Queue</code>. * - * @return <code>int</code> representing the number of elements. + * @return <code>int</code> representing the number of elements (including the reserved ones). */ public int size() { - return m_elements.size(); + return m_elements.size() + m_reserve; } /** @@ -139,13 +135,14 @@ m_mutex.acquire(); try { - - if( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() ) + if( getEnqueuePredicate().accept(elements, this) ) + { + enqueue = new DefaultPreparedEnqueue( this, elements ); + } + else { throw new SinkFullException( "Not enough room to enqueue these elements." ); } - - enqueue = new DefaultPreparedEnqueue( this, elements ); } finally { @@ -154,6 +151,10 @@ } catch( InterruptedException ie ) { + if ( null == enqueue ) + { + throw new SinkException("The mutex was interrupted before it could be released"); + } } return enqueue; @@ -168,14 +169,12 @@ m_mutex.acquire(); try { + success = getEnqueuePredicate().accept( element, this ); - if( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) + if ( success ) { - return false; + m_elements.add( element ); } - - m_elements.add( element ); - success = true; } finally { @@ -199,7 +198,7 @@ m_mutex.acquire(); try { - if( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() ) + if( ! getEnqueuePredicate().accept( elements, this ) ) { throw new SinkFullException( "Not enough room to enqueue these elements." ); } @@ -227,7 +226,7 @@ m_mutex.acquire(); try { - if( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) + if( ! getEnqueuePredicate().accept(element, this) ) { throw new SinkFullException( "Not enough room to enqueue these elements." ); } @@ -246,6 +245,7 @@ public Object[] dequeue( final int numElements ) { + getDequeueInterceptor().before(this); Object[] elements = EMPTY_ARRAY; try @@ -266,13 +266,16 @@ } catch( InterruptedException ie ) { + //TODO: exception handling } + getDequeueInterceptor().after(this); return elements; } public Object[] dequeueAll() { + getDequeueInterceptor().before(this); Object[] elements = EMPTY_ARRAY; try @@ -291,8 +294,10 @@ } catch( InterruptedException ie ) { + // TODO: exception hanlding } + getDequeueInterceptor().after(this); return elements; } @@ -321,6 +326,7 @@ public Object dequeue() { + getDequeueInterceptor().before(this); Object element = null; try @@ -342,8 +348,10 @@ } catch( InterruptedException ie ) { + // TODO: exception handling } + getDequeueInterceptor().after(this); return element; } @@ -356,6 +364,7 @@ { m_parent = parent; m_elements = elements; + m_parent.m_reserve += elements.length; } public void commit() @@ -367,8 +376,8 @@ try { - m_parent.enqueue( m_elements ); m_parent.m_reserve -= m_elements.length; + m_parent.enqueue( m_elements ); m_elements = null; } catch( Exception e ) 1.6 +8 -5 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/FixedSizeQueue.java Index: FixedSizeQueue.java =================================================================== RCS file: /home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/FixedSizeQueue.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- FixedSizeQueue.java 2 Oct 2002 17:49:20 -0000 1.5 +++ FixedSizeQueue.java 11 Feb 2003 04:40:26 -0000 1.6 @@ -60,6 +60,8 @@ * changed. * * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a> + * + * @deprecated Use the DefaultQueue as it properly supports the EnqueuePredicates */ public final class FixedSizeQueue extends AbstractQueue @@ -98,7 +100,7 @@ size = m_end - m_start; } - return size; + return size + m_reserve; } public int maxSize() @@ -116,7 +118,7 @@ m_mutex.acquire(); try { - if( elements.length + m_reserve + size() > maxSize() ) + if( elements.length + size() > maxSize() ) { throw new SinkFullException( "Not enough room to enqueue these elements." ); } @@ -144,7 +146,7 @@ m_mutex.acquire(); try { - if( 1 + m_reserve + size() > maxSize() ) + if( 1 + size() > maxSize() ) { return false; } @@ -174,7 +176,7 @@ m_mutex.acquire(); try { - if( elements.length + m_reserve + size() > maxSize() ) + if( elements.length + size() > maxSize() ) { throw new SinkFullException( "Not enough room to enqueue these elements." ); } @@ -202,7 +204,7 @@ m_mutex.acquire(); try { - if( 1 + m_reserve + size() > maxSize() ) + if( 1 + size() > maxSize() ) { throw new SinkFullException( "Not enough room to enqueue these elements." ); } @@ -361,6 +363,7 @@ { m_parent = parent; m_elements = elements; + m_parent.m_reserve += m_elements.length; } public void commit() 1.1 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/LossyMultiCastSink.java Index: LossyMultiCastSink.java =================================================================== /* ============================================================================ The Apache Software License, Version 1.1 ============================================================================ Copyright (C) @year@ The Apache Software Foundation. All rights reserved. Redistribution and use in source and binary forms, with or without modifica- tion, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software developed by the Apache Software Foundation (http://www.apache.org/)." Alternately, this acknowledgment may appear in the software itself, if and wherever such third-party acknowledgments normally appear. 4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software Foundation" must not be used to endorse or promote products derived from this software without prior written permission. For written permission, please contact [EMAIL PROTECTED] 5. Products derived from this software may not be called "Apache", nor may "Apache" appear in their name, without prior written permission of the Apache Software Foundation. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. This software consists of voluntary contributions made by many individuals on behalf of the Apache Software Foundation. For more information on the Apache Software Foundation, please see <http://www.apache.org/>. */ package org.apache.excalibur.event.impl; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import org.apache.excalibur.event.PreparedEnqueue; import org.apache.excalibur.event.Sink; import org.apache.excalibur.event.SinkException; import org.apache.excalibur.event.SinkFullException; /** * This is a [EMAIL PROTECTED] org.apache.excalibur.event.seda.event.Sink} * implementation that multicasts enqueue operations to the * contained and concrete sink objects. Compared to the * regular [EMAIL PROTECTED] org.apache.excalibur.event.seda.event.impl.MultiCastSink} * this sink works in that it delivers zero, one or more sinks. * It can be configured to fail when less than one sink was * delivered to. * * @version $Revision: 1.1 $ * @author <a href="mailto:[EMAIL PROTECTED]">schierma</a> */ public class LossyMultiCastSink { /** * A collection of sink arrays representing the * sinks to enqueue the element to. */ private final Collection m_sinks; /** * The size of the sink. */ private final int m_size; /** * indicates if at least one enqueue operation must succeed. */ private final boolean m_oneSuccess; //---------------------- LossyMultiCastSink constructors /** * This constructor creates a failure tolerant sink * based on the collection of sink arrays. None of * the enqueue operations must succeed. * @since May 16, 2002 * * @param sinks * A collection of sink arrays for each stage. */ public LossyMultiCastSink(Collection sinks) { this(sinks, false); } /** * This constructor creates a failure tolerant sink * based on the collection of sink arrays. The additional * boolean flag describes whether at least one or none * of the enqueue operations must succeed. * @since May 16, 2002 * * @param sinks * A collection of sink arrays for each stage. */ public LossyMultiCastSink(Collection sinks, boolean oneSuccess) { m_sinks = sinks; m_size = -1; m_oneSuccess = oneSuccess; } //---------------------- Sink implementation /** * @see Sink#canAccept() */ public int canAccept() { return 0; } /** * @see Sink#isFull() */ public boolean isFull() { return false; } /** * @see Sink#maxSize() */ public int maxSize() { return 0; } /** * @see Sink#enqueue(Object) */ public void enqueue(Object element) throws SinkException { final Iterator sinks = m_sinks.iterator(); int successful = 0; //checkEnqueuePredicate(new Object[] { element }); // iterate through the sinks and try to enqueue while (sinks.hasNext()) { final Sink sink = (Sink) sinks.next(); final boolean enqueued = sink.tryEnqueue(element); // enqueue only to the first successful sink if (enqueued) { successful++; break; } } if (successful == 0 && m_oneSuccess) { throw new SinkFullException("Could not deliver one single element."); } } /** * @see Sink#enqueue(Object[]) */ public void enqueue(Object[] elements) throws SinkException { final Iterator sinks = m_sinks.iterator(); int successful = 0; //checkEnqueuePredicate(elements); // iterate through the sinks and try to enqueue while (sinks.hasNext()) { final Sink sink = (Sink) sinks.next(); try { sink.enqueue(elements); } catch (SinkFullException e) { continue; } // if enqueue successful break here successful++; break; } if (successful == 0 && m_oneSuccess) { throw new SinkFullException("Could not deliver one single elements."); } } /** * @see Sink#tryEnqueue(Object) */ public boolean tryEnqueue(Object element) { try { enqueue(element); return true; } catch (SinkException e) { return !m_oneSuccess; } } /** * @see Sink#prepareEnqueue(Object[]) */ public PreparedEnqueue prepareEnqueue(Object[] elements) throws SinkException { final Iterator sinks = m_sinks.iterator(); final DefaultPreparedEnqueue prepares = new DefaultPreparedEnqueue(); int successful = 0; //checkEnqueuePredicate(elements); // iterate through the sinks and try to enqueue while (sinks.hasNext()) { final Sink sink = (Sink) sinks.next(); try { prepares.addPreparedEnqueue(sink.prepareEnqueue(elements)); } catch (SinkFullException e) { continue; } // if enqueue successful break here successful++; break; } if (successful == 0 && m_oneSuccess) { throw new SinkFullException("Could not deliver elements at all."); } return prepares; } /** * @see Sink#size() */ public int size() { return m_size; } //------------------------- LossyMultiCastSink inner classes /** * A prepared enqueue object that holds other prepared * enqueue objects and allows to perform a commit / abort * on all of these objects. * @since May 16, 2002 * * @author <a href = "mailto:[EMAIL PROTECTED]">schierma</a> */ private static final class DefaultPreparedEnqueue implements PreparedEnqueue { /** * A collection of prepared enqueue objects */ private final Collection m_preparedEnqueues = new LinkedList(); //------------------------ PreparedEnqueue implementation /** * @see PreparedEnqueue#abort() */ public void abort() { final Iterator iter = m_preparedEnqueues.iterator(); while (iter.hasNext()) { ((PreparedEnqueue) iter.next()).abort(); } } /** * @see PreparedEnqueue#commit() */ public void commit() { final Iterator iter = m_preparedEnqueues.iterator(); while (iter.hasNext()) { ((PreparedEnqueue) iter.next()).commit(); } } //------------------------ DefaultPreparedEnqueue specific implementation /** * Adds a prepared enqueue object to the list * of prepared enqueues. * @since May 16, 2002 * * @param preparedEnqueue * The prepared enqueue object to be added. */ public void addPreparedEnqueue(PreparedEnqueue preparedEnqueue) { m_preparedEnqueues.add(preparedEnqueue); } } //-- end DefaultPreparedEnqueue inner class } 1.1 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/MultiCastSink.java Index: MultiCastSink.java =================================================================== /* ============================================================================ The Apache Software License, Version 1.1 ============================================================================ Copyright (C) @year@ The Apache Software Foundation. All rights reserved. Redistribution and use in source and binary forms, with or without modifica- tion, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software developed by the Apache Software Foundation (http://www.apache.org/)." Alternately, this acknowledgment may appear in the software itself, if and wherever such third-party acknowledgments normally appear. 4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software Foundation" must not be used to endorse or promote products derived from this software without prior written permission. For written permission, please contact [EMAIL PROTECTED] 5. Products derived from this software may not be called "Apache", nor may "Apache" appear in their name, without prior written permission of the Apache Software Foundation. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. This software consists of voluntary contributions made by many individuals on behalf of the Apache Software Foundation. For more information on the Apache Software Foundation, please see <http://www.apache.org/>. */ package org.apache.excalibur.event.ext; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import org.apache.excalibur.event.PreparedEnqueue; import org.apache.excalibur.event.Sink; import org.apache.excalibur.event.SinkException; import org.apache.excalibur.event.SinkFullException; /** * This is a [EMAIL PROTECTED] org.apache.excalibur.event.seda.event.Sink} * implementation that multicasts enqueue operations to the * contained and concrete sink objects. The multi cast sink * will try to enqueue and only succeeds if no element was * rejected from any sink. The sink can be configured to * enqueue into one sink alone or all sinks. * If a sink array in the collection of sinks contains more * than one sink the multicast sink will try to enqueue the * element always to <b>only one</b> of these sinks. * * @version $Revision: 1.1 $ * @author <a href="mailto:[EMAIL PROTECTED]">schierma</a> */ public class MultiCastSink { /** A collection of sink arrays representing the sinks to enqueue to. */ private final Collection m_sinks; /** The size of the sink. */ private final int m_size; /** Boolean value describing if one or all operations must succeed. */ private final boolean m_single; //---------------------- LossyMultiCastSink constructors /** * This constructor creates a failure in-tolerant multicast * sink based on the collection of sink arrays. The delivery * must succeed for all sinks in the collection or it will * fail entirely. * @since May 16, 2002 * * @param sinks * A collection of sink arrays for each stage. */ public MultiCastSink(Collection sinks) { this(sinks, false); } /** * This constructor creates a failure in-tolerant multicast * sink based on the collection of sink arrays. * @since May 16, 2002 * * @param sinks * A collection of sink arrays for each stage. * @param single * <m_code>true</m_code> if just one operation must succeed. * <m_code>false</m_code> if all operations must succeed. */ public MultiCastSink(Collection sinks, boolean single) { m_sinks = sinks; m_size = -1; m_single = single; } //---------------------- Sink implementation /** * @see Sink#canAccept() */ public int canAccept() { return 0; } /** * @see Sink#isFull() */ public boolean isFull() { return false; } /** * @see Sink#maxSize() */ public int maxSize() { return 0; } /** * @see Sink#enqueue(Object) */ public void enqueue(Object element) throws SinkException { final PreparedEnqueue prepared; prepared = prepareEnqueue(new Object[] { element }); prepared.commit(); } /** * @see Sink#enqueue(Object[]) */ public void enqueue(Object[] elements) throws SinkException { final PreparedEnqueue prepared = prepareEnqueue(elements); prepared.commit(); } /** * @see Sink#tryEnqueue(Object) */ public boolean tryEnqueue(Object element) { try { enqueue(element); return true; } catch (SinkException e) { return false; } } /** * @see Sink#prepareEnqueue(Object[]) */ public PreparedEnqueue prepareEnqueue(Object[] elements) throws SinkException { //checkEnqueuePredicate(elements); final DefaultPreparedEnqueue prepares = new DefaultPreparedEnqueue(); int successful = 0; final Iterator sinks = m_sinks.iterator(); // iterate through the sinks and try to enqueue while (sinks.hasNext()) { final Sink sink = (Sink) sinks.next(); try { prepares.addPreparedEnqueue(sink.prepareEnqueue(elements)); } catch (SinkFullException e) { continue; } // if enqueue successful return here or just break and continue if (m_single) { return prepares; } successful++; break; } if (successful < m_sinks.size()) { // rollback all enqueues. prepares.abort(); throw new SinkFullException("Could not deliver elements."); } return prepares; } /** * @see Sink#size() */ public int size() { return m_size; } //------------------------- LossyMultiCastSink inner classes /** * A prepared enqueue object that holds other prepared * enqueue objects and allows to perform a commit / abort * on all of these objects. * @since May 16, 2002 * * @author <a href = "mailto:[EMAIL PROTECTED]">schierma</a> */ private static final class DefaultPreparedEnqueue implements PreparedEnqueue { /** * A collection of prepared enqueue objects */ private final Collection m_preparedEnqueues = new LinkedList(); //------------------------ PreparedEnqueue implementation /** * @see PreparedEnqueue#abort() */ public void abort() { final Iterator iter = m_preparedEnqueues.iterator(); while (iter.hasNext()) { ((PreparedEnqueue) iter.next()).abort(); } } /** * @see PreparedEnqueue#commit() */ public void commit() { final Iterator iter = m_preparedEnqueues.iterator(); while (iter.hasNext()) { ((PreparedEnqueue) iter.next()).commit(); } } //------------------------ DefaultPreparedEnqueue specific implementation /** * Adds a prepared enqueue object to the list * of prepared enqueues. * @since May 16, 2002 * * @param preparedEnqueue * The prepared enqueue object to be added. */ public void addPreparedEnqueue(PreparedEnqueue preparedEnqueue) { m_preparedEnqueues.add(preparedEnqueue); } } //-- end DefaultPreparedEnqueue inner class } 1.1 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/NullDequeueInterceptor.java Index: NullDequeueInterceptor.java =================================================================== /* ============================================================================ The Apache Software License, Version 1.1 ============================================================================ Copyright (C) @year@ The Apache Software Foundation. All rights reserved. Redistribution and use in source and binary forms, with or without modifica- tion, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software developed by the Apache Software Foundation (http://www.apache.org/)." Alternately, this acknowledgment may appear in the software itself, if and wherever such third-party acknowledgments normally appear. 4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software Foundation" must not be used to endorse or promote products derived from this software without prior written permission. For written permission, please contact [EMAIL PROTECTED] 5. Products derived from this software may not be called "Apache", nor may "Apache" appear in their name, without prior written permission of the Apache Software Foundation. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. This software consists of voluntary contributions made by many individuals on behalf of the Apache Software Foundation. For more information on the Apache Software Foundation, please see <http://www.apache.org/>. */ package org.apache.excalibur.event.impl; import org.apache.excalibur.event.DequeueInterceptor; import org.apache.excalibur.event.Source; /** * The dequeue executable interface describes operations that * are executed before and after elements are pulled from a * queue. * * @version $Revision: 1.1 $ * @author <a href="mailto:[EMAIL PROTECTED]">schierma</a> * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a> */ public final class NullDequeueInterceptor implements DequeueInterceptor { /** * An operation executed before dequeing events from * the queue. The size of the queue is passed in so the * implementation can determine to execute based on the * size of the queue. * @since Feb 10, 2003 * * @param context * The source from which the dequeue is performed. */ public void before(Source context) {} /** * An operation executed after dequeing events from * the queue. The size of the queue is passed in so the * implementation can determine to execute based on the * size of the queue. * @since Feb 10, 2003 * * @param context * The source from which the dequeue is performed. */ public void after(Source context) {} } 1.1 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/NullEnqueuePredicate.java Index: NullEnqueuePredicate.java =================================================================== /* ============================================================================ The Apache Software License, Version 1.1 ============================================================================ Copyright (C) @year@ The Apache Software Foundation. All rights reserved. Redistribution and use in source and binary forms, with or without modifica- tion, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software developed by the Apache Software Foundation (http://www.apache.org/)." Alternately, this acknowledgment may appear in the software itself, if and wherever such third-party acknowledgments normally appear. 4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software Foundation" must not be used to endorse or promote products derived from this software without prior written permission. For written permission, please contact [EMAIL PROTECTED] 5. Products derived from this software may not be called "Apache", nor may "Apache" appear in their name, without prior written permission of the Apache Software Foundation. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. This software consists of voluntary contributions made by many individuals on behalf of the Apache Software Foundation. For more information on the Apache Software Foundation, please see <http://www.apache.org/>. */ package org.apache.excalibur.event.impl; import org.apache.excalibur.event.EnqueuePredicate; import org.apache.excalibur.event.Sink; /** * The NullEnqueuePredicate does nothing to limit a Queue's ability to enqueue. */ public final class NullEnqueuePredicate implements EnqueuePredicate { public boolean accept(Object element, Sink modifyingSink) { return true; } public boolean accept(Object[] element, Sink modifyingSink) { return true; } } 1.1 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/RateLimitingPredicate.java Index: RateLimitingPredicate.java =================================================================== /* ============================================================================ The Apache Software License, Version 1.1 ============================================================================ Copyright (C) @year@ The Apache Software Foundation. All rights reserved. Redistribution and use in source and binary forms, with or without modifica- tion, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software developed by the Apache Software Foundation (http://www.apache.org/)." Alternately, this acknowledgment may appear in the software itself, if and wherever such third-party acknowledgments normally appear. 4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software Foundation" must not be used to endorse or promote products derived from this software without prior written permission. For written permission, please contact [EMAIL PROTECTED] 5. Products derived from this software may not be called "Apache", nor may "Apache" appear in their name, without prior written permission of the Apache Software Foundation. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. This software consists of voluntary contributions made by many individuals on behalf of the Apache Software Foundation. For more information on the Apache Software Foundation, please see <http://www.apache.org/>. */ package org.apache.excalibur.event.impl; import org.apache.excalibur.event.EnqueuePredicate; import org.apache.excalibur.event.Sink; /** * This enqueue predicate implements input rate policing. * * @version $Revision: 1.1 $ * @author <a href="mailto:[EMAIL PROTECTED]">schierma</a> */ public class RateLimitingPredicate implements EnqueuePredicate { /** The rate to which the enqueuing should be limited */ private double m_targetRate; /** The depth of the token bucket */ private int m_depth; private int m_tokenCount; private long m_lastTime; private double m_regenTimeMs; /** Number of milliseconds between regenerations */ private static final long MIN_REGENERATION_TIME = 0; //------------------------- RateLimitingPredicate constructors /** * Create a new RateLimitingPredicate for the given sink, * bucket depth and no rate limit. * @since May 15, 2002 * * @param depth * The token bucket depth. */ public RateLimitingPredicate(int depth) { this(-1.0, depth); } /** * Create a new RateLimitingPredicate for the given sink, * targetRate, and token bucket depth. * A rate of <m_code>-1.0</m_code> indicates no rate limit. * @since May 15, 2002 * * @param targetRate * The rate that is the target for this predicate * @param depth * The token bucket depth. */ public RateLimitingPredicate(double targetRate, int depth) { m_targetRate = targetRate; m_depth = depth; m_regenTimeMs = (1.0 / targetRate) * 1.0e3; if (m_regenTimeMs < 1) { m_regenTimeMs = 1; } m_tokenCount = depth; m_lastTime = System.currentTimeMillis(); } //------------------------- EnqueuePredicate implementation /** * @see EnqueuePredicate#accept(Object, Sink) */ public boolean accept(Object element, Sink sink) { if (m_targetRate == -1.0) { return true; } // First regenerate tokens long currentTime = System.currentTimeMillis(); long delay = currentTime - m_lastTime; if (delay >= MIN_REGENERATION_TIME) { double numTokens = ((double) delay * 1.0) / (m_regenTimeMs * 1.0); m_tokenCount += numTokens; if (m_tokenCount > m_depth) { m_tokenCount = m_depth; } m_lastTime = currentTime; } if (m_tokenCount >= 1) { m_tokenCount--; return true; } else { return false; } } /** * @see EnqueuePredicate#accept(Object, Sink) */ public boolean accept(Object[] elements, Sink sink) { if (m_targetRate == -1.0) { return true; } // First regenerate tokens long currentTime = System.currentTimeMillis(); long delay = currentTime - m_lastTime; if (delay >= MIN_REGENERATION_TIME) { double numTokens = ((double) delay * 1.0) / (m_regenTimeMs * 1.0); m_tokenCount += numTokens; if (m_tokenCount > m_depth) { m_tokenCount = m_depth; } m_lastTime = currentTime; } if (m_tokenCount >= elements.length) { m_tokenCount -= elements.length; return true; } else { return false; } } //------------------------- RateLimitingPredicate specific implementation /** * Returns the current rate limit. * @since May 15, 2002 * * @return double * the current target rate */ public double getTargetRate() { return m_targetRate; } /** * Returns the current depth. * @since May 15, 2002 * * @return int * The current bucket depth. */ public int getDepth() { return m_depth; } /** * Returns the number of tokens currently in the bucket. * @since May 15, 2002 * * @return int * the number of tokens currently in the bucket. */ public int getBucketSize() { return (int) m_tokenCount; } /** * Allows to set the rate limit. A limit of <m_code>-1.0</m_code> * indicates no rate limit. * @since May 15, 2002 * * @param targetRate * the current rate limit. */ public void setTargetRate(double targetRate) { m_targetRate = targetRate; m_regenTimeMs = (1.0 / targetRate) * 1.0e3; if (m_regenTimeMs < 1) { m_regenTimeMs = 1; } } /** * Allows to set the bucket depth. * @since May 15, 2002 * * @param depth * The bucket depth as an integer. */ public void setDepth(int depth) { m_depth = depth; } } 1.1 avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/ThresholdEnqueuePredicate.java Index: ThresholdEnqueuePredicate.java =================================================================== /* ============================================================================ The Apache Software License, Version 1.1 ============================================================================ Copyright (C) @year@ The Apache Software Foundation. All rights reserved. Redistribution and use in source and binary forms, with or without modifica- tion, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software developed by the Apache Software Foundation (http://www.apache.org/)." Alternately, this acknowledgment may appear in the software itself, if and wherever such third-party acknowledgments normally appear. 4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software Foundation" must not be used to endorse or promote products derived from this software without prior written permission. For written permission, please contact [EMAIL PROTECTED] 5. Products derived from this software may not be called "Apache", nor may "Apache" appear in their name, without prior written permission of the Apache Software Foundation. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU- DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. This software consists of voluntary contributions made by many individuals on behalf of the Apache Software Foundation. For more information on the Apache Software Foundation, please see <http://www.apache.org/>. */ package org.apache.excalibur.event.impl; import org.apache.excalibur.event.EnqueuePredicate; import org.apache.excalibur.event.Sink; /** * The ThresholdEnqueuePredicate limits the elements that can be enqueued * based on the size of the Queue. */ public final class ThresholdEnqueuePredicate implements EnqueuePredicate { private final int m_threshold; /** * Create a new ThresholdEnqueuePredicate with the supplied limit. * * @param limit A number greater than zero */ public ThresholdEnqueuePredicate(int limit) { m_threshold = limit; } /** * Returns true if the Sink size + 1 (the element) is less than the * threshold. */ public boolean accept(Object element, Sink modifyingSink) { if ( m_threshold <=0 ) return true; return (modifyingSink.size() + 1) < m_threshold; } /** * Returns true if the Sink size + the number of elements is less than * the threshold. */ public boolean accept(Object[] elements, Sink modifyingSink) { if ( m_threshold <=0 ) return true; return (modifyingSink.size() + elements.length) < m_threshold; } } 1.8 +2 -2 avalon-excalibur/event/src/test/org/apache/excalibur/event/test/AbstractQueueTestCase.java Index: AbstractQueueTestCase.java =================================================================== RCS file: /home/cvs/avalon-excalibur/event/src/test/org/apache/excalibur/event/test/AbstractQueueTestCase.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -r1.7 -r1.8 --- AbstractQueueTestCase.java 3 Sep 2002 17:10:56 -0000 1.7 +++ AbstractQueueTestCase.java 11 Feb 2003 04:40:27 -0000 1.8 @@ -169,12 +169,12 @@ assertEquals( 0, queue.size() ); PreparedEnqueue prep = queue.prepareEnqueue( elements ); - assertEquals( 0, queue.size() ); + assertEquals( 10, queue.size() ); prep.abort(); assertEquals( 0, queue.size() ); prep = queue.prepareEnqueue( elements ); - assertEquals( 0, queue.size() ); + assertEquals( 10, queue.size() ); prep.commit(); assertEquals( 10, queue.size() ); 1.11 +7 -1 avalon-excalibur/event/src/test/org/apache/excalibur/event/test/FixedSizeQueueTestCase.java Index: FixedSizeQueueTestCase.java =================================================================== RCS file: /home/cvs/avalon-excalibur/event/src/test/org/apache/excalibur/event/test/FixedSizeQueueTestCase.java,v retrieving revision 1.10 retrieving revision 1.11 diff -u -r1.10 -r1.11 --- FixedSizeQueueTestCase.java 3 Sep 2002 17:43:01 -0000 1.10 +++ FixedSizeQueueTestCase.java 11 Feb 2003 04:40:27 -0000 1.11 @@ -49,7 +49,7 @@ */ package org.apache.excalibur.event.test; -import org.apache.excalibur.event.impl.FixedSizeQueue; +import org.apache.excalibur.event.impl.*; /** * The default queue implementation is a variabl size queue. @@ -67,5 +67,11 @@ throws Exception { this.performQueue( new FixedSizeQueue( 32 ) ); + } + + public void testThresholdDefaultQueue() + throws Exception + { + this.performQueue( new DefaultQueue( new ThresholdEnqueuePredicate( 32 ) ) ); } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]