bloritsch    2003/02/10 20:40:27

  Modified:    event    build.xml default.properties
               event/src/java/org/apache/excalibur/event Queue.java
                        Sink.java
               event/src/java/org/apache/excalibur/event/impl
                        AbstractQueue.java DefaultQueue.java
                        FixedSizeQueue.java
               event/src/test/org/apache/excalibur/event/test
                        AbstractQueueTestCase.java
                        FixedSizeQueueTestCase.java
  Added:       .        check-targets.ent check-targets.properties
               event/src/java/org/apache/excalibur/event
                        DequeueInterceptor.java EnqueuePredicate.java
               event/src/java/org/apache/excalibur/event/impl
                        LossyMultiCastSink.java MultiCastSink.java
                        NullDequeueInterceptor.java
                        NullEnqueuePredicate.java
                        RateLimitingPredicate.java
                        ThresholdEnqueuePredicate.java
  Removed:     event/src/silk/java/org/apache/excalibur/event/ext
                        AbstractQueue.java AbstractSimpleSink.java
                        AbstractSink.java DefaultDequeueInterceptor.java
                        DefaultPredicate.java DefaultQueue.java
                        DequeueInterceptor.java
                        DequeueInterceptorSource.java EnqueuePredicate.java
                        EnqueuePredicateSink.java FixedSizeQueue.java
                        LossyMultiCastSink.java MultiCastSink.java
                        RateLimitingPredicate.java ThresholdPredicate.java
  Log:
  merge important parts of silk proposal into event, the rest will go in the 
SEDA project
  
  Revision  Changes    Path
  1.1                  avalon-excalibur/check-targets.ent
  
  Index: check-targets.ent
  ===================================================================
  <!--
  This build.xml snippet contains targets for ant 1.5 that check whether
  a specific libary is present, and offers targets for downloading them
  if they're not.
  
  It's kinda of a replacement for some maven functionality until maven
  becomes stable.
  
  Usage:
  1) Copy this file to somewhere in your project.
  2) Add the following to the top of your project's Ant build.xml script
  (adjusting the path):
  
    <!DOCTYPE project [
      <!ENTITY check-targets SYSTEM "file:./check-targets.ent">
    ]>
  
  3) Before the closing '</project>' in your build.xml, add this:
  
    &check-targets;
  
  4) define the properties
      - lib.dir
      - project.class.path
      - Name
  
  This is like expanding a macro: it pulls in the contents of this file.
  
  A minimal build.xml would thus be:
  
  <!DOCTYPE project [
  <!ENTITY check-targets SYSTEM "file:./check-targets.ent">
  ]>
  
  <project default="check-all-dependencies">
      <property name="Name" value="My Project"/>
      <property name="lib.dir" value="lib"/>
      <path id="project.class.path">
          <fileset dir="${lib.dir}">
              <include name="*.jar" />
          </fileset>
      </path>
  
      &check-targets;
  </project>
  -->
  
      <!-- Set up properties -->
      <property file="check-targets.properties"/>
      <property file="../check-targets.properties"/>
      <target name="setup-dependencies">
          <available property="bsf.present"
                 classname="com.ibm.bsf.BSFEngine"
                 classpathref="project.class.path" />
          <available property="checkstyle.present"
                 classname="com.puppycrawl.tools.checkstyle.Checker"
                 classpathref="project.class.path" />
          <available property="commons-logging.present"
             classname="org.apache.comons.logging.Log"
             classpathref="project.class.path" />
        <available property="javax.jms.present"
                 classname="javax.jms.TopicConnection"
                 classpathref="project.class.path" />
          <available property="javax.mail.present"
                 classname="javax.mail.Message"
                 classpathref="project.class.path" />
          <available property="javax.servlet.present"
                 classname="javax.servlet.ServletContext"
                 classpathref="project.class.path" />
          <available property="javax.sql.present"
                 classname="javax.sql.DataSource"
                 classpathref="project.class.path" />
          <available property="jdk14.present"
             classname="java.util.logging.Logger"
             classpathref="project.class.path" />
          <available property="junit.present"
                 classname="junit.framework.Test"
                 classpathref="project.class.path" />
          <available property="log4j.present"
             classname="org.apache.log4j.Level"
             classpathref="project.class.path" />
          <available property="logkit.present"
             classname="org.apache.log.Hierarchy"
             classpathref="project.class.path" />
        <available property="rhino.present"
           classname="org.mozilla.javascript.Parser"
           classpathref="project.class.path"/>
  
      </target>
  
    <!-- called if a dependency is missing which has a BSD or ASL or
         similar license for which autodownload is okay. -->
    <target name="missing-dependency">
      <echo>
      *************************************************************************
      Classes needed for compiling ${Name} against the ${id} API
      are not available. The build may fail or some functionality may
      not be available.
  
      Recovery:
      Run the build target import-${id} and it will download
      the needed jar - you should be online for that.
  
      If you have a fast connection you can also just run the build target
      import-all-auto-dependencies, which will download all external jars this
      buildfile knows about.
      Note this may download unused jars as well.
      *************************************************************************
      </echo>
      <echo/>
    </target>
  
    <!-- called if a dependency is missing which we cannot autodownload
         due to licensing-->
    <target name="missing-dependency-noauto">
      <echo>
      *************************************************************************
      Classes needed for compiling ${Name} against the ${id} API
      are not available. The build may fail or some functionality may
      not be available.
  
      Recovery:
      Get the ${id} jar from the ${id} distribution at
      ${dist.url}
      and place it in ${lib.dir}.
  
      Please note that ${id} is licensed under the ${license}
      and that by downloading it you are agreeing to that license. You can read
      this license at
      ${license.url}
      *************************************************************************
      </echo>
      <echo/>
    </target>
  
    <!-- this target fetches a file from a maven repository -->
    <target name="import-dependency">
      <get src="${license.url}"
          dest="${lib.dir}/${id}.LICENSE.html"
          usetimestamp="true"/>
      <loadfile property="license.text" 
srcFile="${lib.dir}/${id}.LICENSE.html"/>
      <echo>
      *************************************************************************
      You have requested to download the ${id} jar, which is licensed under
      the ${license}. A copy of this license has been saved to
      ${lib.dir}/${id}.LICENSE.html
  
      Please view it now.
      *************************************************************************
      </echo>
      <input message="Do you agree to the terms of this license?"
          validargs="y,n"
          addproperty="do.download"/>
  
      <condition property="do.abort">
          <equals arg1="n" arg2="${do.download}"/>
      </condition>
      <fail if="do.abort">Download aborted.</fail>
  
      <get src="${jar.repository}/${id}/jars/${jar.id}-${version}.jar"
          dest="${lib.dir}/${id}-${version}.jar"
          verbose="true"
          usetimestamp="true"/>
    </target>
  
    <target name="check-all-dependencies"
            
depends="bsf-check,checkstyle-check,javamail-check,jdbc-check,jms-check,junit-check,log4j-check,logkit-check,servletapi-check,commons-logging-check"
        description="checks for all dependencies we know about. This is usually 
more than the minimum needed to build this project; actual use is not 
recommended."/>
    <target name="import-all-auto-dependencies"
            
depends="import-bsf,import-checkstyle,import-junit,import-log4j,import-logkit,import-commons-logging"
        description="checks for all dependencies we know about. This is usually 
more than the minimum needed to build this project; actual use is not 
recommended."/>
  
    <!-- BSF -->
    <target name="bsf-check" unless="bsf.present" depends="setup-dependencies">
      <antcall target="missing-dependency">
        <param name="id" value="${bsf.id}"/>
        <param name="jar.id" value="${bsf.id}"/>
        <param name="version" value="${bsf.version}"/>
        <param name="license" value="${bsf.license}"/>
        <param name="license.url" value="${bsf.license.url}"/>
      </antcall>
    </target>
  
    <target name="import-bsf" depends="setup-dependencies">
      <antcall target="import-dependency">
        <param name="id" value="${bsf.id}"/>
        <param name="jar.id" value="${bsf.id}"/>
        <param name="version" value="${bsf.version}"/>
        <param name="license" value="${bsf.license}"/>
        <param name="license.url" value="${bsf.license.url}"/>
      </antcall>
    </target>
  
    <!-- Checkstyle -->
    <target name="checkstyle-check" unless="checkstyle.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency">
        <param name="id" value="${checkstyle.id}"/>
        <param name="jar.id" value="${checkstyle.id}"/>
        <param name="version" value="${checkstyle.version}"/>
        <param name="license" value="${checkstyle.license}"/>
        <param name="license.url" value="${checkstyle.license.url}"/>
      </antcall>
    </target>
  
    <target name="import-checkstyle" depends="setup-dependencies">
      <antcall target="import-dependency">
        <param name="id" value="${checkstyle.id}"/>
        <param name="jar.id" value="${checkstyle.id}"/>
        <param name="version" value="${checkstyle.version}"/>
        <param name="license" value="${checkstyle.license}"/>
        <param name="license.url" value="${checkstyle.license.url}"/>
      </antcall>
    </target>
  
    <!-- Commons-Logging -->
    <target name="commons-logging-check" unless="commons-logging.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency">
        <param name="id" value="${commons-logging.id}"/>
        <param name="jar.id" value="${commons-logging.id}"/>
        <param name="version" value="${commons-logging.version}"/>
        <param name="license" value="${commons-logging.license}"/>
        <param name="license.url" value="${commons-logging.license.url}"/>
      </antcall>
    </target>
  
    <target name="import-commons-logging" depends="setup-dependencies">
      <antcall target="import-dependency">
        <param name="id" value="${commons-logging.id}"/>
        <param name="jar.id" value="${commons-logging.id}"/>
        <param name="version" value="${commons-logging.version}"/>
        <param name="license" value="${commons-logging.license}"/>
        <param name="license.url" value="${commons-logging.license.url}"/>
      </antcall>
    </target>
  
    <!-- JavaMail -->
    <target name="javamail-check" unless="javax.mail.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency-noauto">
        <param name="id" value="${javamail.id}"/>
        <param name="jar.id" value="${javamail.id}"/>
        <param name="version" value="${javamail.version}"/>
        <param name="license" value="${javamail.license}"/>
        <param name="license.url" value="${javamail.license.url}"/>
        <param name="dist.url" value="${javamail.dist.url}"/>
      </antcall>
    </target>
  
    <!-- JDBC -->
    <target name="jdbc-check" unless="javax.sql.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency-noauto">
        <param name="id" value="${jdbc.id}"/>
        <param name="jar.id" value="${jdbc.id}"/>
        <param name="version" value="${jl.version}"/>
        <param name="license" value="${jdbc.license}"/>
        <param name="license.url" value="${jdbc.license.url}"/>
        <param name="dist.url" value="${jdbc.dist.url}"/>
      </antcall>
    </target>
  
    <!-- JDK 1.4 -->
    <target name="jdk14-check" unless="jdk14.present" 
depends="setup-dependencies">
      
<echo>*************************************************************************</echo>
      <echo>*</echo>
      <echo>*  You are compiling with a pre-1.4 jdk. Some functionality may not 
be</echo>
      <echo>*  available.</echo>
      
<echo>*************************************************************************</echo>
      <echo/>
    </target>
  
    <!-- JMS -->
    <target name="jms-check" unless="javax.jms.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency-noauto">
        <param name="id" value="${jms.id}"/>
        <param name="jar.id" value="${jms.id}"/>
        <param name="version" value="${jms.version}"/>
        <param name="license" value="${jms.license}"/>
        <param name="license.url" value="${jms.license.url}"/>
        <param name="dist.url" value="${jms.dist.url}"/>
      </antcall>
    </target>
  
    <!-- JUnit -->
    <target name="junit-check" unless="junit.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency">
        <param name="id" value="${junit.id}"/>
        <param name="jar.id" value="${junit.id}"/>
        <param name="version" value="${junit.version}"/>
        <param name="license" value="${junit.license}"/>
        <param name="license.url" value="${junit.license.url}"/>
      </antcall>
    </target>
  
    <target name="import-junit" depends="setup-dependencies">
      <antcall target="import-dependency">
        <param name="id" value="${junit.id}"/>
        <param name="jar.id" value="${junit.id}"/>
        <param name="version" value="${junit.version}"/>
        <param name="license" value="${junit.license}"/>
        <param name="license.url" value="${junit.license.url}"/>
      </antcall>
    </target>
  
    <!-- Log4J -->
    <target name="log4j-check" unless="log4j.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency">
        <param name="id" value="${log4j.id}"/>
        <param name="jar.id" value="${log4j.id}"/>
        <param name="version" value="${log4j.version}"/>
        <param name="license" value="${log4j.license}"/>
        <param name="license.url" value="${log4j.license.url}"/>
      </antcall>
    </target>
  
    <target name="import-log4j" depends="setup-dependencies">
      <antcall target="import-dependency">
        <param name="id" value="${log4j.id}"/>
        <param name="jar.id" value="${log4j.id}"/>
        <param name="version" value="${log4j.version}"/>
        <param name="license" value="${log4j.license}"/>
        <param name="license.url" value="${log4j.license.url}"/>
      </antcall>
    </target>
  
    <!-- Logkit -->
    <target name="logkit-check" unless="logkit.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency">
        <param name="id" value="${logkit.id}"/>
        <param name="jar.id" value="${logit.id}"/>
        <param name="version" value="${logkit.version}"/>
        <param name="license" value="${logkit.license}"/>
        <param name="license.url" value="${logkit.license.url}"/>
      </antcall>
    </target>
  
    <target name="import-logkit" depends="setup-dependencies">
      <antcall target="import-dependency">
        <param name="id" value="${logkit.id}"/>
        <param name="jar.id" value="${logkit.id}"/>
        <param name="version" value="${logkit.version}"/>
        <param name="license" value="${logkit.license}"/>
        <param name="license.url" value="${logkit.license.url}"/>
      </antcall>
    </target>
  
    <!-- Rhino -->
    <target name="rhino-check" unless="rhino.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency">
        <param name="id" value="${rhino.id}"/>
        <param name="jar.id" value="${rhino.jar.id}"/>
        <param name="version" value="${rhino.version}"/>
        <param name="license" value="${rhino.license}"/>
        <param name="license.url" value="${rhino.license.url}"/>
      </antcall>
    </target>
  
    <target name="import-rhino" depends="setup-dependencies">
      <antcall target="import-dependency">
        <param name="id" value="${rhino.id}"/>
        <param name="jar.id" value="${rhino.jar.id}"/>
        <param name="version" value="${rhino.version}"/>
        <param name="license" value="${rhino.license}"/>
        <param name="license.url" value="${rhino.license.url}"/>
      </antcall>
    </target>
  
    <!-- Servlet API -->
    <target name="servletapi-check" unless="javax.servlet.present" 
depends="setup-dependencies">
      <antcall target="missing-dependency-noauto">
        <param name="id" value="${servletapi.id}"/>
        <param name="jar.id" value="${servletapi.id}"/>
        <param name="version" value="${servletapi.version}"/>
        <param name="license" value="${servletapi.license}"/>
        <param name="license.url" value="${servletapi.license.url}"/>
        <param name="dist.url" value="${servletapi.dist.url}"/>
      </antcall>
    </target>
  
  
  
  1.1                  avalon-excalibur/check-targets.properties
  
  Index: check-targets.properties
  ===================================================================
  # sets up properties for the dependency download mechanism
  
  jar.repository=http://www.ibiblio.org/maven
  
  ibm.license=IBM Public License
  ibm.license.url=http://www.opensource.org/licenses/ibmpl.php
  cpl.license=Common Public License
  cpl.license.url=http://www.opensource.org/licenses/cpl.php
  asl.license=Apache Software License
  asl.license.url=http://www.opensource.org/licenses/apachepl.php
  lgpl.license=GNU Lesser General Public License
  lgpl.license.url=http://www.opensource.org/licenses/lgpl-license.php
  sbcl.license=Sun Binary Code License
  npl.license=Netzcape Public License
  npl.license.url=http://www.mozilla.org/MPL/NPL-1.1.html
  
  # BSF
  bsf.version=2.2
  bsf.id=bsf
  bsf.license=${ibm.license}
  bsf.license.url=${ibm.license.url}
  bsf.dist.url=http://www-124.ibm.com/developerworks/projects/bsf
  
  # Checkstyle
  checkstyle.version=2.4
  checkstyle.id=checkstyle
  checkstyle.license=${lgpl.license}
  checkstyle.license.url=${lgpl.license.url}
  checkstyle.dist.url=http://checkstyle.sf.net/
  
  # Commons-Logging
  commons-logging.version=1.0.2
  commons-logging.id=commons-logging
  commons-logging.license=${asl.license}
  commons-logging.license.url=${asl.license.url}
  commons-logging.dist.url=http://jakarta.apache.org/commons/logging/
  
  # JavaMail
  javamail.version=1.2
  javamail.id=javamail
  javamail.license=${sbcl.license}
  javamail.license.url=http://java.sun.com/products/javamail/
  javamail.dist.url=http://java.sun.com/products/javamail/
  
  # JDBC
  jdbc.version=2.0
  jdbc.id=jdbc
  jdbc.license=${sbcl.license}
  jdbc.license.url=http://java.sun.com/products/jdbc/
  jdbc.dist.url=http://java.sun.com/products/jdbc/
  
  # JMS
  jms.version=1.0.2
  jms.id=jms
  jms.license=${sbcl.license}
  jms.license.url=http://java.sun.com/products/jms/
  jms.dist.url=http://java.sun.com/products/jms/
  
  # JUnit
  junit.version=3.7
  junit.id=junit
  junit.license=${cpl.license}
  junit.license.url=${cpl.license.url}
  junit.dist.url=http://www.junit.org/
  
  # Log4J
  log4j.version=1.2.7
  log4j.id=log4j
  log4j.license=${asl.license}
  log4j.license.url=${asl.license.url}
  log4j.dist.url=http://jakarta.apache.org/log4j/
  
  # Logkit
  logkit.version=1.0.1
  logkit.id=logkit
  logkit.license=${asl.license}
  logkit.license.url=${asl.license.url}
  logkit.dist.url=http://avalon.apache.org/logkit/
  
  # Rhino
  rhino.version=1.5R4-RC3
  rhino.id=rhino
  rhino.jar.id=js
  rhino.license=${npl.license}
  rhino.license.url=${npl.license.url}
  rhino.dist.urlftp:ftp://ftp.mozilla.org/pub/js/
  
  # Servlet API
  servletapi.version=2.2
  servletapi.id=servletapi
  servletapi.license=${sbcl.license}
  servletapi.license.url=http://java.sun.com/products/servlet/
  servletapi.dist.url=http://java.sun.com/products/servlet/
  
  
  
  
  
  1.43      +11 -0     avalon-excalibur/event/build.xml
  
  Index: build.xml
  ===================================================================
  RCS file: /home/cvs/avalon-excalibur/event/build.xml,v
  retrieving revision 1.42
  retrieving revision 1.43
  diff -u -r1.42 -r1.43
  --- build.xml 29 Jan 2003 14:07:39 -0000      1.42
  +++ build.xml 11 Feb 2003 04:40:26 -0000      1.43
  @@ -1,5 +1,9 @@
   <?xml version="1.0"?>
   
  +<!DOCTYPE project [
  +  <!ENTITY check-targets SYSTEM "file:../check-targets.ent">
  +]>
  +
   <project name="Excalibur Event" default="main" basedir=".">
   
       <property file="${user.home}/build.properties"/>
  @@ -18,6 +22,9 @@
           <pathelement location="${avalon-framework.jar}"/>
           <pathelement location="${checkstyle.jar}"/>
           <pathelement path="${java.class.path}"/>
  +        <fileset dir="${lib.dir}">
  +            <include name="*.jar" />
  +        </fileset>
       </path>
   
       <path id="tools.class.path">
  @@ -38,6 +45,9 @@
       </path>
       <property name="cp" refid="test.class.path"/>
   
  +    <target name="check-dependencies" depends="junit-check"/>
  +    <target name="import-dependencies" depends="import-junit"/>
  +
       <target name="main" depends="jar" description="Build the project"/>
       <target name="rebuild" depends="clean,main" description="Rebuild the 
project"/>
   
  @@ -391,4 +401,5 @@
           <delete dir="${dist.base}" />
       </target>
   
  +    &check-targets;
   </project>
  
  
  
  1.36      +2 -0      avalon-excalibur/event/default.properties
  
  Index: default.properties
  ===================================================================
  RCS file: /home/cvs/avalon-excalibur/event/default.properties,v
  retrieving revision 1.35
  retrieving revision 1.36
  diff -u -r1.35 -r1.36
  --- default.properties        6 Feb 2003 13:38:30 -0000       1.35
  +++ default.properties        11 Feb 2003 04:40:26 -0000      1.36
  @@ -112,4 +112,6 @@
   #  property indicating directory where all distribution archives are placed
   dist.base = distributions
   
  +lib.dir = lib
  +
   depchecker.prefix=.
  
  
  
  1.6       +47 -0     
avalon-excalibur/event/src/java/org/apache/excalibur/event/Queue.java
  
  Index: Queue.java
  ===================================================================
  RCS file: 
/home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/Queue.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- Queue.java        10 Nov 2002 03:39:19 -0000      1.5
  +++ Queue.java        11 Feb 2003 04:40:26 -0000      1.6
  @@ -61,8 +61,55 @@
    * </p>
    *
    * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
  + * @author <a href="mailto:[EMAIL PROTECTED]">schierma</a>
    */
   public interface Queue extends Source, Sink
   {
       String ROLE = Queue.class.getName();
  +
  +    /**
  +     * Set the enqueue predicate for this sink. This mechanism
  +     * allows user to define a method that will 'screen'
  +     * QueueElementIF's during the enqueue procedure to either
  +     * accept or reject them. The enqueue predicate runs in the
  +     * context of the caller of [EMAIL PROTECTED] #enqueue(Object)},
  +     * which means it must be simple and fast. This can be used
  +     * to implement many interesting m_sink-thresholding policies,
  +     * such as simple count threshold, credit-based mechanisms,
  +     * and more.
  +     * @since Feb 10, 2003
  +     *
  +     * @param enqueuePredicate
  +     *  the enqueue predicate for this sink
  +     */
  +    public void setEnqueuePredicate(EnqueuePredicate enqueuePredicate);
  +
  +    /**
  +     * Return the enqueue predicate for this sink.
  +     * @since Feb 10, 2003
  +     *
  +     * @return [EMAIL PROTECTED] EnqueuePredicate}
  +     *  the enqueue predicate for this sink.
  +     */
  +    public EnqueuePredicate getEnqueuePredicate();
  +
  +    /**
  +     * Set the dequeue executable for this sink. This mechanism
  +     * allows users to define a methods that will be executed
  +     * before or after dequeuing elements from a source
  +     * @since Feb 10, 2003
  +     *
  +     * @param executable
  +     *  The dequeue executable for this sink.
  +     */
  +    public void setDequeueInterceptor(DequeueInterceptor executable);
  +
  +    /**
  +     * Return the dequeue executable for this sink.
  +     * @since Feb 10, 2003
  +     *
  +     * @return [EMAIL PROTECTED] DequeueInterceptor}
  +     *  The dequeue executable for this sink.
  +     */
  +    public DequeueInterceptor getDequeueInterceptor();
   }
  
  
  
  1.16      +13 -0     
avalon-excalibur/event/src/java/org/apache/excalibur/event/Sink.java
  
  Index: Sink.java
  ===================================================================
  RCS file: 
/home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/Sink.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- Sink.java 30 Sep 2002 17:49:13 -0000      1.15
  +++ Sink.java 11 Feb 2003 04:40:26 -0000      1.16
  @@ -155,6 +155,8 @@
        * thresholding, and does not always accurately report maxSize().
        *
        * @return -1 if the sink has no length threshold.
  +     *
  +     * @deprecated  Use the EnqueuePredicate to control this instead.
        */
       int maxSize();
   
  @@ -166,6 +168,8 @@
        * fail, since the Sink may be serviced in the meantime.
        *
        * @return true if the Sink is full
  +     *
  +     * @deprecated  Use the EnqueuePredicate to control this instead
        */
       boolean isFull();
   
  @@ -175,11 +179,20 @@
        * <code>maxSize()</code>.  It will return -1 if the sink is unbounded.
        *
        * @return the number of elements the Sink can accept
  +     *
  +     * @deprecated  Use the EnqueuePredicate to control this instead.
        */
       int canAccept();
   
       /**
        * Returns the number of elements waiting in this Sink.
  +     *
  +     * <p><span style="color: blue;"><i>Important:</i></span>
  +     *   The contract for this method was updated to account for any elements
  +     *   that were prepared for enqueueing.  It provides a more predictable
  +     *   and consistent environment, as well as making it easier for
  +     *   EnqueuePredicates to account for those elements.
  +     * </p>
        *
        * @return the number of elements in the Sink
        */
  
  
  
  1.1                  
avalon-excalibur/event/src/java/org/apache/excalibur/event/DequeueInterceptor.java
  
  Index: DequeueInterceptor.java
  ===================================================================
  /*
  
   ============================================================================
                     The Apache Software License, Version 1.1
   ============================================================================
  
   Copyright (C) @year@ The Apache Software Foundation. All rights reserved.
  
   Redistribution and use in source and binary forms, with or without modifica-
   tion, are permitted provided that the following conditions are met:
  
   1. Redistributions of  source code must  retain the above copyright  notice,
      this list of conditions and the following disclaimer.
  
   2. Redistributions in binary form must reproduce the above copyright notice,
      this list of conditions and the following disclaimer in the documentation
      and/or other materials provided with the distribution.
  
   3. The end-user documentation included with the redistribution, if any, must
      include  the following  acknowledgment:  "This product includes  software
      developed  by the  Apache Software Foundation  (http://www.apache.org/)."
      Alternately, this  acknowledgment may  appear in the software itself,  if
      and wherever such third-party acknowledgments normally appear.
  
   4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software 
Foundation"
      must not be used to endorse or promote products derived from this  
software
      without  prior written permission. For written permission, please contact
      [EMAIL PROTECTED]
  
   5. Products  derived from this software may not  be called "Apache", nor may
      "Apache" appear  in their name,  without prior written permission  of the
      Apache Software Foundation.
  
   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
   INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS  FOR A PARTICULAR  PURPOSE ARE  DISCLAIMED.  IN NO  EVENT SHALL  THE
   APACHE SOFTWARE  FOUNDATION  OR ITS CONTRIBUTORS  BE LIABLE FOR  ANY DIRECT,
   INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR CONSEQUENTIAL  DAMAGES (INCLU-
   DING, BUT NOT LIMITED TO, PROCUREMENT  OF SUBSTITUTE GOODS OR SERVICES; LOSS
   OF USE, DATA, OR  PROFITS; OR BUSINESS  INTERRUPTION)  HOWEVER CAUSED AND ON
   ANY  THEORY OF LIABILITY,  WHETHER  IN CONTRACT,  STRICT LIABILITY,  OR TORT
   (INCLUDING  NEGLIGENCE OR  OTHERWISE) ARISING IN  ANY WAY OUT OF THE  USE OF
   THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  
   This software  consists of voluntary contributions made  by many individuals
   on  behalf of the Apache Software  Foundation. For more  information on the
   Apache Software Foundation, please see <http://www.apache.org/>.
  
  */
  package org.apache.excalibur.event;
  
  /**
   * The dequeue executable interface describes operations that
   * are executed before and after elements are pulled from a
   * queue.
   *
   * @version $Revision: 1.1 $
   * @author  <a href="mailto:[EMAIL PROTECTED]">schierma</a>
   */
  public interface DequeueInterceptor
  {
  
      /**
       * An operation executed before dequeing events from
       * the queue. The Source is passed in so the implementation
       * can determine to execute based on the queue properties.
       *
       * <p>
       *   This method is called once at the beginning of any 
<code>dequeue</code>
       *   method regardless of how many queue elements are dequeued.
       * </p>
       *
       * @since Feb 10, 2003
       *
       * @param context  The source from which the dequeue is performed.
       */
      public void before(Source context);
  
      /**
       * An operation executed after dequeing events from
       * the queue. The Source is passed in so the implementation
       * can determine to execute based on the queue properties.
       *
       * <p>
       *   This method is called once at the end of any <code>dequeue</code>
       *   method regardless of how many queue elements are dequeued.
       * </p>
       *
       * @since Feb 10, 2003
       *
       * @param context  The source from which the dequeue is performed.
       */
      public void after(Source context);
  }
  
  
  1.1                  
avalon-excalibur/event/src/java/org/apache/excalibur/event/EnqueuePredicate.java
  
  Index: EnqueuePredicate.java
  ===================================================================
  /*
  
   ============================================================================
                     The Apache Software License, Version 1.1
   ============================================================================
  
   Copyright (C) @year@ The Apache Software Foundation. All rights reserved.
  
   Redistribution and use in source and binary forms, with or without modifica-
   tion, are permitted provided that the following conditions are met:
  
   1. Redistributions of  source code must  retain the above copyright  notice,
      this list of conditions and the following disclaimer.
  
   2. Redistributions in binary form must reproduce the above copyright notice,
      this list of conditions and the following disclaimer in the documentation
      and/or other materials provided with the distribution.
  
   3. The end-user documentation included with the redistribution, if any, must
      include  the following  acknowledgment:  "This product includes  software
      developed  by the  Apache Software Foundation  (http://www.apache.org/)."
      Alternately, this  acknowledgment may  appear in the software itself,  if
      and wherever such third-party acknowledgments normally appear.
  
   4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software 
Foundation"
      must not be used to endorse or promote products derived from this  
software
      without  prior written permission. For written permission, please contact
      [EMAIL PROTECTED]
  
   5. Products  derived from this software may not  be called "Apache", nor may
      "Apache" appear  in their name,  without prior written permission  of the
      Apache Software Foundation.
  
   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
   INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS  FOR A PARTICULAR  PURPOSE ARE  DISCLAIMED.  IN NO  EVENT SHALL  THE
   APACHE SOFTWARE  FOUNDATION  OR ITS CONTRIBUTORS  BE LIABLE FOR  ANY DIRECT,
   INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR CONSEQUENTIAL  DAMAGES (INCLU-
   DING, BUT NOT LIMITED TO, PROCUREMENT  OF SUBSTITUTE GOODS OR SERVICES; LOSS
   OF USE, DATA, OR  PROFITS; OR BUSINESS  INTERRUPTION)  HOWEVER CAUSED AND ON
   ANY  THEORY OF LIABILITY,  WHETHER  IN CONTRACT,  STRICT LIABILITY,  OR TORT
   (INCLUDING  NEGLIGENCE OR  OTHERWISE) ARISING IN  ANY WAY OUT OF THE  USE OF
   THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  
   This software  consists of voluntary contributions made  by many individuals
   on  behalf of the Apache Software  Foundation. For more  information on the
   Apache Software Foundation, please see <http://www.apache.org/>.
  
  */
  package org.apache.excalibur.event;
  
  import org.apache.excalibur.event.Sink;
  
  /**
   * Enqueue predicates allow users to specify a method that
   * will 'screen' elements being enqueued onto a sink, either
   * accepting or rejecting them. This mechanism can be used
   * to implement many interesting load-conditioning policies,
   * for example, simple thresholding, rate control, credit-based
   * flow control, and so forth. Note that the enqueue predicate
   * runs in the context of the <b>caller of enqueue()</b>, which
   * means it must be simple and fast.
   *
   * @version $Revision: 1.1 $
   * @author  <a href="mailto:[EMAIL PROTECTED]">schierma</a>
   */
  public interface EnqueuePredicate
  {
      /**
       * Tests the given element for acceptance onto the m_sink.
       * @since Feb 10, 2003
       *
       * @param  element  The element to enqueue
       * @param  modifyingSing  The sink that is used for this predicate
       * @return
       *  <code>true</code> if the sink accepts the element;
       *  <code>false</code> otherwise.
       */
      boolean accept(Object element, Sink modifyingSink);
  
      /**
       * Tests the given element for acceptance onto the m_sink.
       * @since Feb 10, 2003
       *
       * @param  elements  The array of elements to enqueue
       * @param  modifyingSing  The sink that is used for this predicate
       * @return
       *  <code>true</code> if the sink accepts all the elements;
       *  <code>false</code> otherwise.
       */
      boolean accept(Object elements[], Sink modifyingSink);
  }
  
  
  1.4       +50 -0     
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/AbstractQueue.java
  
  Index: AbstractQueue.java
  ===================================================================
  RCS file: 
/home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/AbstractQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- AbstractQueue.java        30 Sep 2002 16:17:01 -0000      1.3
  +++ AbstractQueue.java        11 Feb 2003 04:40:26 -0000      1.4
  @@ -50,6 +50,8 @@
   package org.apache.excalibur.event.impl;
   
   import org.apache.excalibur.event.Queue;
  +import org.apache.excalibur.event.EnqueuePredicate;
  +import org.apache.excalibur.event.DequeueInterceptor;
   
   /**
    * Provides the base functionality for the other <code>Queue</code> types.
  @@ -63,6 +65,8 @@
       protected final static Object[] EMPTY_ARRAY = new Object[ 0 ];
       /** The number of milliseconds to wait */
       protected long m_timeout = 0;
  +    protected EnqueuePredicate m_predicate = new NullEnqueuePredicate();
  +    protected DequeueInterceptor m_interceptor = new 
NullDequeueInterceptor();
   
       /**
        * Default for canAccept()
  @@ -140,5 +144,51 @@
                   }
               }
           }
  +    }
  +
  +    /**
  +     * Set the EnqueuePredicate to limit entries into this Queue.
  +     */
  +    public void setEnqueuePredicate( EnqueuePredicate predicate )
  +    {
  +        if ( null == predicate ) throw new NullPointerException( "predicate" 
);
  +
  +        m_predicate = predicate;
  +    }
  +
  +    /**
  +     * Return the EnqueuePredicate that is already set for this Queue.
  +     */
  +    public EnqueuePredicate getEnqueuePredicate()
  +    {
  +        return m_predicate;
  +    }
  +
  +    /**
  +     * Set the dequeue executable for this sink. This mechanism
  +     * allows users to define a methods that will be executed
  +     * before or after dequeuing elements from a source
  +     * @since Sep 23, 2002
  +     *
  +     * @param executable
  +     *  The dequeue executable for this sink.
  +     */
  +    public void setDequeueInterceptor(DequeueInterceptor executable)
  +    {
  +        if ( null == executable ) throw new NullPointerException( 
"executable" );
  +
  +        m_interceptor = executable;
  +    }
  +
  +    /**
  +     * Return the dequeue executable for this sink.
  +     * @since Sep 23, 2002
  +     *
  +     * @return [EMAIL PROTECTED] DequeueInterceptor}
  +     *  The dequeue executable for this sink.
  +     */
  +    public DequeueInterceptor getDequeueInterceptor()
  +    {
  +        return m_interceptor;
       }
   }
  
  
  
  1.6       +36 -27    
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/DefaultQueue.java
  
  Index: DefaultQueue.java
  ===================================================================
  RCS file: 
/home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/DefaultQueue.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- DefaultQueue.java 2 Oct 2002 17:49:20 -0000       1.5
  +++ DefaultQueue.java 11 Feb 2003 04:40:26 -0000      1.6
  @@ -52,6 +52,7 @@
   import org.apache.commons.collections.Buffer;
   import org.apache.commons.collections.UnboundedFifoBuffer;
   import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
  +import org.apache.excalibur.event.EnqueuePredicate;
   import org.apache.excalibur.event.PreparedEnqueue;
   import org.apache.excalibur.event.SinkException;
   import org.apache.excalibur.event.SinkFullException;
  @@ -81,22 +82,17 @@
        */
       public DefaultQueue( int size )
       {
  -        int maxSize;
  +        this( new ThresholdEnqueuePredicate( size ) );
  +    }
   
  -        if( size > 0 )
  -        {
  -            m_elements = new UnboundedFifoBuffer( size );
  -            maxSize = size;
  -        }
  -        else
  -        {
  -            m_elements = new UnboundedFifoBuffer();
  -            maxSize = -1;
  -        }
  +    public DefaultQueue( EnqueuePredicate predicate )
  +    {
  +        setEnqueuePredicate( predicate );
   
           m_mutex = new ReentrantLock();
  +        m_elements = new UnboundedFifoBuffer();
           m_reserve = 0;
  -        m_maxSize = maxSize;
  +        m_maxSize = -1;
       }
   
       /**
  @@ -104,17 +100,17 @@
        */
       public DefaultQueue()
       {
  -        this( -1 );
  +        this( new NullEnqueuePredicate() );
       }
   
       /**
        * Return the number of elements currently in the <code>Queue</code>.
        *
  -     * @return <code>int</code> representing the number of elements.
  +     * @return <code>int</code> representing the number of elements 
(including the reserved ones).
        */
       public int size()
       {
  -        return m_elements.size();
  +        return m_elements.size() + m_reserve;
       }
   
       /**
  @@ -139,13 +135,14 @@
               m_mutex.acquire();
               try
               {
  -
  -                if( maxSize() > 0 && elements.length + m_reserve + size() > 
maxSize() )
  +                if( getEnqueuePredicate().accept(elements, this) )
  +                {
  +                    enqueue = new DefaultPreparedEnqueue( this, elements );
  +                }
  +                else
                   {
                       throw new SinkFullException( "Not enough room to enqueue 
these elements." );
                   }
  -
  -                enqueue = new DefaultPreparedEnqueue( this, elements );
               }
               finally
               {
  @@ -154,6 +151,10 @@
           }
           catch( InterruptedException ie )
           {
  +            if ( null == enqueue )
  +            {
  +                throw new SinkException("The mutex was interrupted before it 
could be released");
  +            }
           }
   
           return enqueue;
  @@ -168,14 +169,12 @@
               m_mutex.acquire();
               try
               {
  +                success = getEnqueuePredicate().accept( element, this );
   
  -                if( maxSize() > 0 && 1 + m_reserve + size() > maxSize() )
  +                if ( success )
                   {
  -                    return false;
  +                    m_elements.add( element );
                   }
  -
  -                m_elements.add( element );
  -                success = true;
               }
               finally
               {
  @@ -199,7 +198,7 @@
               m_mutex.acquire();
               try
               {
  -                if( maxSize() > 0 && elements.length + m_reserve + size() > 
maxSize() )
  +                if( ! getEnqueuePredicate().accept( elements, this ) )
                   {
                       throw new SinkFullException( "Not enough room to enqueue 
these elements." );
                   }
  @@ -227,7 +226,7 @@
               m_mutex.acquire();
               try
               {
  -                if( maxSize() > 0 && 1 + m_reserve + size() > maxSize() )
  +                if( ! getEnqueuePredicate().accept(element, this) )
                   {
                       throw new SinkFullException( "Not enough room to enqueue 
these elements." );
                   }
  @@ -246,6 +245,7 @@
   
       public Object[] dequeue( final int numElements )
       {
  +        getDequeueInterceptor().before(this);
           Object[] elements = EMPTY_ARRAY;
   
           try
  @@ -266,13 +266,16 @@
           }
           catch( InterruptedException ie )
           {
  +            //TODO: exception handling
           }
   
  +        getDequeueInterceptor().after(this);
           return elements;
       }
   
       public Object[] dequeueAll()
       {
  +        getDequeueInterceptor().before(this);
           Object[] elements = EMPTY_ARRAY;
   
           try
  @@ -291,8 +294,10 @@
           }
           catch( InterruptedException ie )
           {
  +            // TODO: exception hanlding
           }
   
  +        getDequeueInterceptor().after(this);
           return elements;
       }
   
  @@ -321,6 +326,7 @@
   
       public Object dequeue()
       {
  +        getDequeueInterceptor().before(this);
           Object element = null;
   
           try
  @@ -342,8 +348,10 @@
           }
           catch( InterruptedException ie )
           {
  +            // TODO: exception handling
           }
   
  +        getDequeueInterceptor().after(this);
           return element;
       }
   
  @@ -356,6 +364,7 @@
           {
               m_parent = parent;
               m_elements = elements;
  +            m_parent.m_reserve += elements.length;
           }
   
           public void commit()
  @@ -367,8 +376,8 @@
   
               try
               {
  -                m_parent.enqueue( m_elements );
                   m_parent.m_reserve -= m_elements.length;
  +                m_parent.enqueue( m_elements );
                   m_elements = null;
               }
               catch( Exception e )
  
  
  
  1.6       +8 -5      
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/FixedSizeQueue.java
  
  Index: FixedSizeQueue.java
  ===================================================================
  RCS file: 
/home/cvs/avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/FixedSizeQueue.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- FixedSizeQueue.java       2 Oct 2002 17:49:20 -0000       1.5
  +++ FixedSizeQueue.java       11 Feb 2003 04:40:26 -0000      1.6
  @@ -60,6 +60,8 @@
    * changed.
    *
    * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
  + *
  + * @deprecated Use the DefaultQueue as it properly supports the 
EnqueuePredicates
    */
   public final class FixedSizeQueue
       extends AbstractQueue
  @@ -98,7 +100,7 @@
               size = m_end - m_start;
           }
   
  -        return size;
  +        return size + m_reserve;
       }
   
       public int maxSize()
  @@ -116,7 +118,7 @@
               m_mutex.acquire();
               try
               {
  -                if( elements.length + m_reserve + size() > maxSize() )
  +                if( elements.length + size() > maxSize() )
                   {
                       throw new SinkFullException( "Not enough room to enqueue 
these elements." );
                   }
  @@ -144,7 +146,7 @@
               m_mutex.acquire();
               try
               {
  -                if( 1 + m_reserve + size() > maxSize() )
  +                if( 1 + size() > maxSize() )
                   {
                       return false;
                   }
  @@ -174,7 +176,7 @@
               m_mutex.acquire();
               try
               {
  -                if( elements.length + m_reserve + size() > maxSize() )
  +                if( elements.length + size() > maxSize() )
                   {
                       throw new SinkFullException( "Not enough room to enqueue 
these elements." );
                   }
  @@ -202,7 +204,7 @@
               m_mutex.acquire();
               try
               {
  -                if( 1 + m_reserve + size() > maxSize() )
  +                if( 1 + size() > maxSize() )
                   {
                       throw new SinkFullException( "Not enough room to enqueue 
these elements." );
                   }
  @@ -361,6 +363,7 @@
           {
               m_parent = parent;
               m_elements = elements;
  +            m_parent.m_reserve += m_elements.length;
           }
   
           public void commit()
  
  
  
  1.1                  
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/LossyMultiCastSink.java
  
  Index: LossyMultiCastSink.java
  ===================================================================
  /*
  
   ============================================================================
                     The Apache Software License, Version 1.1
   ============================================================================
  
   Copyright (C) @year@ The Apache Software Foundation. All rights reserved.
  
   Redistribution and use in source and binary forms, with or without modifica-
   tion, are permitted provided that the following conditions are met:
  
   1. Redistributions of  source code must  retain the above copyright  notice,
      this list of conditions and the following disclaimer.
  
   2. Redistributions in binary form must reproduce the above copyright notice,
      this list of conditions and the following disclaimer in the documentation
      and/or other materials provided with the distribution.
  
   3. The end-user documentation included with the redistribution, if any, must
      include  the following  acknowledgment:  "This product includes  software
      developed  by the  Apache Software Foundation  (http://www.apache.org/)."
      Alternately, this  acknowledgment may  appear in the software itself,  if
      and wherever such third-party acknowledgments normally appear.
  
   4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software 
Foundation"
      must not be used to endorse or promote products derived from this  
software
      without  prior written permission. For written permission, please contact
      [EMAIL PROTECTED]
  
   5. Products  derived from this software may not  be called "Apache", nor may
      "Apache" appear  in their name,  without prior written permission  of the
      Apache Software Foundation.
  
   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
   INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS  FOR A PARTICULAR  PURPOSE ARE  DISCLAIMED.  IN NO  EVENT SHALL  THE
   APACHE SOFTWARE  FOUNDATION  OR ITS CONTRIBUTORS  BE LIABLE FOR  ANY DIRECT,
   INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR CONSEQUENTIAL  DAMAGES (INCLU-
   DING, BUT NOT LIMITED TO, PROCUREMENT  OF SUBSTITUTE GOODS OR SERVICES; LOSS
   OF USE, DATA, OR  PROFITS; OR BUSINESS  INTERRUPTION)  HOWEVER CAUSED AND ON
   ANY  THEORY OF LIABILITY,  WHETHER  IN CONTRACT,  STRICT LIABILITY,  OR TORT
   (INCLUDING  NEGLIGENCE OR  OTHERWISE) ARISING IN  ANY WAY OUT OF THE  USE OF
   THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  
   This software  consists of voluntary contributions made  by many individuals
   on  behalf of the Apache Software  Foundation. For more  information on the
   Apache Software Foundation, please see <http://www.apache.org/>.
  
  */
  package org.apache.excalibur.event.impl;
  
  import java.util.Collection;
  import java.util.Iterator;
  import java.util.LinkedList;
  
  import org.apache.excalibur.event.PreparedEnqueue;
  import org.apache.excalibur.event.Sink;
  import org.apache.excalibur.event.SinkException;
  import org.apache.excalibur.event.SinkFullException;
  
  /**
   * This is a [EMAIL PROTECTED] org.apache.excalibur.event.seda.event.Sink}
   * implementation that multicasts enqueue operations to the
   * contained and concrete sink objects.  Compared to the
   * regular [EMAIL PROTECTED] 
org.apache.excalibur.event.seda.event.impl.MultiCastSink}
   * this sink works in that it delivers zero, one or more sinks.
   * It can be configured to fail when less than one sink was
   * delivered to.
   *
   * @version $Revision: 1.1 $
   * @author  <a href="mailto:[EMAIL PROTECTED]">schierma</a>
   */
  public class LossyMultiCastSink
  {
      /**
       * A collection of sink arrays representing the
       * sinks to enqueue the element to.
       */
      private final Collection m_sinks;
  
      /**
       * The size of the sink.
       */
      private final int m_size;
  
      /**
       * indicates if at least one enqueue operation must succeed.
       */
      private final boolean m_oneSuccess;
  
      //---------------------- LossyMultiCastSink constructors
      /**
       * This constructor creates a failure tolerant sink
       * based on the collection of sink arrays. None of
       * the enqueue operations must succeed.
       * @since May 16, 2002
       *
       * @param sinks
       *  A collection of sink arrays for each stage.
       */
      public LossyMultiCastSink(Collection sinks)
      {
          this(sinks, false);
      }
  
      /**
       * This constructor creates a failure tolerant sink
       * based on the collection of sink arrays. The additional
       * boolean flag describes whether at least one or none
       * of the enqueue operations must succeed.
       * @since May 16, 2002
       *
       * @param sinks
       *  A collection of sink arrays for each stage.
       */
      public LossyMultiCastSink(Collection sinks, boolean oneSuccess)
      {
          m_sinks = sinks;
          m_size = -1;
          m_oneSuccess = oneSuccess;
      }
  
      //---------------------- Sink implementation
      /**
       * @see Sink#canAccept()
       */
      public int canAccept()
      {
          return 0;
      }
  
      /**
       * @see Sink#isFull()
       */
      public boolean isFull()
      {
          return false;
      }
  
      /**
       * @see Sink#maxSize()
       */
      public int maxSize()
      {
          return 0;
      }
  
      /**
       * @see Sink#enqueue(Object)
       */
      public void enqueue(Object element) throws SinkException
      {
          final Iterator sinks = m_sinks.iterator();
  
          int successful = 0;
  
          //checkEnqueuePredicate(new Object[] { element });
  
          // iterate through the sinks and try to enqueue
          while (sinks.hasNext())
          {
              final Sink sink = (Sink) sinks.next();
  
              final boolean enqueued = sink.tryEnqueue(element);
  
              // enqueue only to the first successful sink
              if (enqueued)
              {
                  successful++;
                  break;
              }
          }
  
          if (successful == 0 && m_oneSuccess)
          {
              throw new SinkFullException("Could not deliver one single 
element.");
          }
      }
  
      /**
       * @see Sink#enqueue(Object[])
       */
      public void enqueue(Object[] elements) throws SinkException
      {
          final Iterator sinks = m_sinks.iterator();
  
          int successful = 0;
  
          //checkEnqueuePredicate(elements);
  
          // iterate through the sinks and try to enqueue
          while (sinks.hasNext())
          {
              final Sink sink = (Sink) sinks.next();
  
              try
              {
                  sink.enqueue(elements);
              }
              catch (SinkFullException e)
              {
                  continue;
              }
  
              // if enqueue successful break here
              successful++;
              break;
          }
  
          if (successful == 0 && m_oneSuccess)
          {
              throw new SinkFullException("Could not deliver one single 
elements.");
          }
      }
  
      /**
       * @see Sink#tryEnqueue(Object)
       */
      public boolean tryEnqueue(Object element)
      {
          try
          {
              enqueue(element);
              return true;
          }
          catch (SinkException e)
          {
              return !m_oneSuccess;
          }
      }
  
      /**
       * @see Sink#prepareEnqueue(Object[])
       */
      public PreparedEnqueue prepareEnqueue(Object[] elements)
          throws SinkException
      {
          final Iterator sinks = m_sinks.iterator();
          final DefaultPreparedEnqueue prepares = new DefaultPreparedEnqueue();
  
          int successful = 0;
  
          //checkEnqueuePredicate(elements);
  
          // iterate through the sinks and try to enqueue
          while (sinks.hasNext())
          {
              final Sink sink = (Sink) sinks.next();
  
              try
              {
                  prepares.addPreparedEnqueue(sink.prepareEnqueue(elements));
              }
              catch (SinkFullException e)
              {
                  continue;
              }
  
              // if enqueue successful break here
              successful++;
              break;
          }
          if (successful == 0 && m_oneSuccess)
          {
              throw new SinkFullException("Could not deliver elements at all.");
          }
  
          return prepares;
      }
  
      /**
       * @see Sink#size()
       */
      public int size()
      {
          return m_size;
      }
  
      //------------------------- LossyMultiCastSink inner classes
      /**
       * A prepared enqueue object that holds other prepared
       * enqueue objects and allows to perform a commit / abort
       * on all of these objects.
       * @since May 16, 2002
       *
       * @author <a href = "mailto:[EMAIL PROTECTED]">schierma</a>
       */
      private static final class DefaultPreparedEnqueue
          implements PreparedEnqueue
      {
          /**
           * A collection of prepared enqueue objects
           */
          private final Collection m_preparedEnqueues = new LinkedList();
  
          //------------------------ PreparedEnqueue implementation
          /**
           * @see PreparedEnqueue#abort()
           */
          public void abort()
          {
              final Iterator iter = m_preparedEnqueues.iterator();
  
              while (iter.hasNext())
              {
                  ((PreparedEnqueue) iter.next()).abort();
              }
          }
  
          /**
           * @see PreparedEnqueue#commit()
           */
          public void commit()
          {
              final Iterator iter = m_preparedEnqueues.iterator();
  
              while (iter.hasNext())
              {
                  ((PreparedEnqueue) iter.next()).commit();
              }
          }
  
          //------------------------ DefaultPreparedEnqueue specific 
implementation
          /**
           * Adds a prepared enqueue object to the list
           * of prepared enqueues.
           * @since May 16, 2002
           *
           * @param preparedEnqueue
           *  The prepared enqueue object to be added.
           */
          public void addPreparedEnqueue(PreparedEnqueue preparedEnqueue)
          {
              m_preparedEnqueues.add(preparedEnqueue);
          }
      } //-- end DefaultPreparedEnqueue inner class
  }
  
  
  1.1                  
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/MultiCastSink.java
  
  Index: MultiCastSink.java
  ===================================================================
  /*
  
   ============================================================================
                     The Apache Software License, Version 1.1
   ============================================================================
  
   Copyright (C) @year@ The Apache Software Foundation. All rights reserved.
  
   Redistribution and use in source and binary forms, with or without modifica-
   tion, are permitted provided that the following conditions are met:
  
   1. Redistributions of  source code must  retain the above copyright  notice,
      this list of conditions and the following disclaimer.
  
   2. Redistributions in binary form must reproduce the above copyright notice,
      this list of conditions and the following disclaimer in the documentation
      and/or other materials provided with the distribution.
  
   3. The end-user documentation included with the redistribution, if any, must
      include  the following  acknowledgment:  "This product includes  software
      developed  by the  Apache Software Foundation  (http://www.apache.org/)."
      Alternately, this  acknowledgment may  appear in the software itself,  if
      and wherever such third-party acknowledgments normally appear.
  
   4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software 
Foundation"
      must not be used to endorse or promote products derived from this  
software
      without  prior written permission. For written permission, please contact
      [EMAIL PROTECTED]
  
   5. Products  derived from this software may not  be called "Apache", nor may
      "Apache" appear  in their name,  without prior written permission  of the
      Apache Software Foundation.
  
   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
   INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS  FOR A PARTICULAR  PURPOSE ARE  DISCLAIMED.  IN NO  EVENT SHALL  THE
   APACHE SOFTWARE  FOUNDATION  OR ITS CONTRIBUTORS  BE LIABLE FOR  ANY DIRECT,
   INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR CONSEQUENTIAL  DAMAGES (INCLU-
   DING, BUT NOT LIMITED TO, PROCUREMENT  OF SUBSTITUTE GOODS OR SERVICES; LOSS
   OF USE, DATA, OR  PROFITS; OR BUSINESS  INTERRUPTION)  HOWEVER CAUSED AND ON
   ANY  THEORY OF LIABILITY,  WHETHER  IN CONTRACT,  STRICT LIABILITY,  OR TORT
   (INCLUDING  NEGLIGENCE OR  OTHERWISE) ARISING IN  ANY WAY OUT OF THE  USE OF
   THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  
   This software  consists of voluntary contributions made  by many individuals
   on  behalf of the Apache Software  Foundation. For more  information on the
   Apache Software Foundation, please see <http://www.apache.org/>.
  
  */
  package org.apache.excalibur.event.ext;
  
  import java.util.Collection;
  import java.util.Iterator;
  import java.util.LinkedList;
  
  import org.apache.excalibur.event.PreparedEnqueue;
  import org.apache.excalibur.event.Sink;
  import org.apache.excalibur.event.SinkException;
  import org.apache.excalibur.event.SinkFullException;
  
  /**
   * This is a [EMAIL PROTECTED] org.apache.excalibur.event.seda.event.Sink}
   * implementation that multicasts enqueue operations to the
   * contained and concrete sink objects.  The multi cast sink
   * will try to enqueue and only succeeds if no element was
   * rejected from any sink. The sink can be configured to
   * enqueue into one sink alone or all sinks.
   * If a sink array in the collection of sinks contains more
   * than one sink the multicast sink will try to enqueue the
   * element always to <b>only one</b> of these sinks.
   *
   * @version $Revision: 1.1 $
   * @author  <a href="mailto:[EMAIL PROTECTED]">schierma</a>
   */
  public class MultiCastSink
  {
      /** A collection of sink arrays representing the  sinks to enqueue to. */
      private final Collection m_sinks;
  
      /** The size of the sink. */
      private final int m_size;
  
      /** Boolean value describing if one or all operations must succeed. */
      private final boolean m_single;
  
      //---------------------- LossyMultiCastSink constructors
      /**
       * This constructor creates a failure in-tolerant multicast
       * sink based on the collection of sink arrays. The delivery
       * must succeed for all sinks in the collection or it will
       * fail entirely.
       * @since May 16, 2002
       *
       * @param sinks
       *  A collection of sink arrays for each stage.
       */
      public MultiCastSink(Collection sinks)
      {
          this(sinks, false);
      }
  
      /**
       * This constructor creates a failure in-tolerant multicast
       * sink based on the collection of sink arrays.
       * @since May 16, 2002
       *
       * @param sinks
       *  A collection of sink arrays for each stage.
       * @param single
       *  <m_code>true</m_code> if just one operation must succeed.
       *  <m_code>false</m_code> if all operations must succeed.
       */
      public MultiCastSink(Collection sinks, boolean single)
      {
          m_sinks = sinks;
          m_size = -1;
          m_single = single;
      }
  
      //---------------------- Sink implementation
      /**
       * @see Sink#canAccept()
       */
      public int canAccept()
      {
          return 0;
      }
  
      /**
       * @see Sink#isFull()
       */
      public boolean isFull()
      {
          return false;
      }
  
      /**
       * @see Sink#maxSize()
       */
      public int maxSize()
      {
          return 0;
      }
  
      /**
       * @see Sink#enqueue(Object)
       */
      public void enqueue(Object element) throws SinkException
      {
          final PreparedEnqueue prepared;
          prepared = prepareEnqueue(new Object[] { element });
          prepared.commit();
      }
  
      /**
       * @see Sink#enqueue(Object[])
       */
      public void enqueue(Object[] elements) throws SinkException
      {
          final PreparedEnqueue prepared = prepareEnqueue(elements);
          prepared.commit();
      }
  
      /**
       * @see Sink#tryEnqueue(Object)
       */
      public boolean tryEnqueue(Object element)
      {
          try
          {
              enqueue(element);
              return true;
          }
          catch (SinkException e)
          {
              return false;
          }
      }
  
      /**
       * @see Sink#prepareEnqueue(Object[])
       */
      public PreparedEnqueue prepareEnqueue(Object[] elements)
          throws SinkException
      {
  
          //checkEnqueuePredicate(elements);
  
          final DefaultPreparedEnqueue prepares = new DefaultPreparedEnqueue();
          int successful = 0;
  
          final Iterator sinks = m_sinks.iterator();
  
          // iterate through the sinks and try to enqueue
          while (sinks.hasNext())
          {
              final Sink sink = (Sink) sinks.next();
  
              try
              {
                  prepares.addPreparedEnqueue(sink.prepareEnqueue(elements));
              }
              catch (SinkFullException e)
              {
                  continue;
              }
  
              // if enqueue successful return here or just break and continue
              if (m_single)
              {
                  return prepares;
              }
              successful++;
              break;
  
          }
          if (successful < m_sinks.size())
          {
              // rollback all enqueues.
              prepares.abort();
  
              throw new SinkFullException("Could not deliver elements.");
          }
  
          return prepares;
      }
  
      /**
       * @see Sink#size()
       */
      public int size()
      {
          return m_size;
      }
  
      //------------------------- LossyMultiCastSink inner classes
      /**
       * A prepared enqueue object that holds other prepared
       * enqueue objects and allows to perform a commit / abort
       * on all of these objects.
       * @since May 16, 2002
       *
       * @author <a href = "mailto:[EMAIL PROTECTED]">schierma</a>
       */
      private static final class DefaultPreparedEnqueue
          implements PreparedEnqueue
      {
          /**
           * A collection of prepared enqueue objects
           */
          private final Collection m_preparedEnqueues = new LinkedList();
  
          //------------------------ PreparedEnqueue implementation
          /**
           * @see PreparedEnqueue#abort()
           */
          public void abort()
          {
              final Iterator iter = m_preparedEnqueues.iterator();
  
              while (iter.hasNext())
              {
                  ((PreparedEnqueue) iter.next()).abort();
              }
          }
  
          /**
           * @see PreparedEnqueue#commit()
           */
          public void commit()
          {
              final Iterator iter = m_preparedEnqueues.iterator();
  
              while (iter.hasNext())
              {
                  ((PreparedEnqueue) iter.next()).commit();
              }
          }
  
          //------------------------ DefaultPreparedEnqueue specific 
implementation
          /**
           * Adds a prepared enqueue object to the list
           * of prepared enqueues.
           * @since May 16, 2002
           *
           * @param preparedEnqueue
           *  The prepared enqueue object to be added.
           */
          public void addPreparedEnqueue(PreparedEnqueue preparedEnqueue)
          {
              m_preparedEnqueues.add(preparedEnqueue);
          }
      } //-- end DefaultPreparedEnqueue inner class
  }
  
  
  1.1                  
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/NullDequeueInterceptor.java
  
  Index: NullDequeueInterceptor.java
  ===================================================================
  /*
  
   ============================================================================
                     The Apache Software License, Version 1.1
   ============================================================================
  
   Copyright (C) @year@ The Apache Software Foundation. All rights reserved.
  
   Redistribution and use in source and binary forms, with or without modifica-
   tion, are permitted provided that the following conditions are met:
  
   1. Redistributions of  source code must  retain the above copyright  notice,
      this list of conditions and the following disclaimer.
  
   2. Redistributions in binary form must reproduce the above copyright notice,
      this list of conditions and the following disclaimer in the documentation
      and/or other materials provided with the distribution.
  
   3. The end-user documentation included with the redistribution, if any, must
      include  the following  acknowledgment:  "This product includes  software
      developed  by the  Apache Software Foundation  (http://www.apache.org/)."
      Alternately, this  acknowledgment may  appear in the software itself,  if
      and wherever such third-party acknowledgments normally appear.
  
   4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software 
Foundation"
      must not be used to endorse or promote products derived from this  
software
      without  prior written permission. For written permission, please contact
      [EMAIL PROTECTED]
  
   5. Products  derived from this software may not  be called "Apache", nor may
      "Apache" appear  in their name,  without prior written permission  of the
      Apache Software Foundation.
  
   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
   INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS  FOR A PARTICULAR  PURPOSE ARE  DISCLAIMED.  IN NO  EVENT SHALL  THE
   APACHE SOFTWARE  FOUNDATION  OR ITS CONTRIBUTORS  BE LIABLE FOR  ANY DIRECT,
   INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR CONSEQUENTIAL  DAMAGES (INCLU-
   DING, BUT NOT LIMITED TO, PROCUREMENT  OF SUBSTITUTE GOODS OR SERVICES; LOSS
   OF USE, DATA, OR  PROFITS; OR BUSINESS  INTERRUPTION)  HOWEVER CAUSED AND ON
   ANY  THEORY OF LIABILITY,  WHETHER  IN CONTRACT,  STRICT LIABILITY,  OR TORT
   (INCLUDING  NEGLIGENCE OR  OTHERWISE) ARISING IN  ANY WAY OUT OF THE  USE OF
   THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  
   This software  consists of voluntary contributions made  by many individuals
   on  behalf of the Apache Software  Foundation. For more  information on the
   Apache Software Foundation, please see <http://www.apache.org/>.
  
  */
  package org.apache.excalibur.event.impl;
  
  import org.apache.excalibur.event.DequeueInterceptor;
  import org.apache.excalibur.event.Source;
  
  /**
   * The dequeue executable interface describes operations that
   * are executed before and after elements are pulled from a
   * queue.
   *
   * @version $Revision: 1.1 $
   * @author  <a href="mailto:[EMAIL PROTECTED]">schierma</a>
   * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
   */
  public final class NullDequeueInterceptor implements DequeueInterceptor
  {
  
      /**
       * An operation executed before dequeing events from
       * the queue. The size of the queue is passed in so the
       * implementation can determine to execute based on the
       * size of the queue.
       * @since Feb 10, 2003
       *
       * @param context
       *  The source from which the dequeue is performed.
       */
      public void before(Source context) {}
  
      /**
       * An operation executed after dequeing events from
       * the queue. The size of the queue is passed in so the
       * implementation can determine to execute based on the
       * size of the queue.
       * @since Feb 10, 2003
       *
       * @param context
       *  The source from which the dequeue is performed.
       */
      public void after(Source context) {}
  
  }
  
  
  
  1.1                  
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/NullEnqueuePredicate.java
  
  Index: NullEnqueuePredicate.java
  ===================================================================
  /*
  
   ============================================================================
                     The Apache Software License, Version 1.1
   ============================================================================
  
   Copyright (C) @year@ The Apache Software Foundation. All rights reserved.
  
   Redistribution and use in source and binary forms, with or without modifica-
   tion, are permitted provided that the following conditions are met:
  
   1. Redistributions of  source code must  retain the above copyright  notice,
      this list of conditions and the following disclaimer.
  
   2. Redistributions in binary form must reproduce the above copyright notice,
      this list of conditions and the following disclaimer in the documentation
      and/or other materials provided with the distribution.
  
   3. The end-user documentation included with the redistribution, if any, must
      include  the following  acknowledgment:  "This product includes  software
      developed  by the  Apache Software Foundation  (http://www.apache.org/)."
      Alternately, this  acknowledgment may  appear in the software itself,  if
      and wherever such third-party acknowledgments normally appear.
  
   4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software 
Foundation"
      must not be used to endorse or promote products derived from this  
software
      without  prior written permission. For written permission, please contact
      [EMAIL PROTECTED]
  
   5. Products  derived from this software may not  be called "Apache", nor may
      "Apache" appear  in their name,  without prior written permission  of the
      Apache Software Foundation.
  
   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
   INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS  FOR A PARTICULAR  PURPOSE ARE  DISCLAIMED.  IN NO  EVENT SHALL  THE
   APACHE SOFTWARE  FOUNDATION  OR ITS CONTRIBUTORS  BE LIABLE FOR  ANY DIRECT,
   INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR CONSEQUENTIAL  DAMAGES (INCLU-
   DING, BUT NOT LIMITED TO, PROCUREMENT  OF SUBSTITUTE GOODS OR SERVICES; LOSS
   OF USE, DATA, OR  PROFITS; OR BUSINESS  INTERRUPTION)  HOWEVER CAUSED AND ON
   ANY  THEORY OF LIABILITY,  WHETHER  IN CONTRACT,  STRICT LIABILITY,  OR TORT
   (INCLUDING  NEGLIGENCE OR  OTHERWISE) ARISING IN  ANY WAY OUT OF THE  USE OF
   THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  
   This software  consists of voluntary contributions made  by many individuals
   on  behalf of the Apache Software  Foundation. For more  information on the
   Apache Software Foundation, please see <http://www.apache.org/>.
  
  */
  package org.apache.excalibur.event.impl;
  
  import org.apache.excalibur.event.EnqueuePredicate;
  import org.apache.excalibur.event.Sink;
  
  /**
   * The NullEnqueuePredicate does nothing to limit a Queue's ability to 
enqueue.
   */
  public final class NullEnqueuePredicate implements EnqueuePredicate
  {
      public boolean accept(Object element, Sink modifyingSink)
      {
          return true;
      }
  
      public boolean accept(Object[] element, Sink modifyingSink)
      {
          return true;
      }
  }
  
  
  
  1.1                  
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/RateLimitingPredicate.java
  
  Index: RateLimitingPredicate.java
  ===================================================================
  /*
  
   ============================================================================
                     The Apache Software License, Version 1.1
   ============================================================================
  
   Copyright (C) @year@ The Apache Software Foundation. All rights reserved.
  
   Redistribution and use in source and binary forms, with or without modifica-
   tion, are permitted provided that the following conditions are met:
  
   1. Redistributions of  source code must  retain the above copyright  notice,
      this list of conditions and the following disclaimer.
  
   2. Redistributions in binary form must reproduce the above copyright notice,
      this list of conditions and the following disclaimer in the documentation
      and/or other materials provided with the distribution.
  
   3. The end-user documentation included with the redistribution, if any, must
      include  the following  acknowledgment:  "This product includes  software
      developed  by the  Apache Software Foundation  (http://www.apache.org/)."
      Alternately, this  acknowledgment may  appear in the software itself,  if
      and wherever such third-party acknowledgments normally appear.
  
   4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software 
Foundation"
      must not be used to endorse or promote products derived from this  
software
      without  prior written permission. For written permission, please contact
      [EMAIL PROTECTED]
  
   5. Products  derived from this software may not  be called "Apache", nor may
      "Apache" appear  in their name,  without prior written permission  of the
      Apache Software Foundation.
  
   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
   INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS  FOR A PARTICULAR  PURPOSE ARE  DISCLAIMED.  IN NO  EVENT SHALL  THE
   APACHE SOFTWARE  FOUNDATION  OR ITS CONTRIBUTORS  BE LIABLE FOR  ANY DIRECT,
   INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR CONSEQUENTIAL  DAMAGES (INCLU-
   DING, BUT NOT LIMITED TO, PROCUREMENT  OF SUBSTITUTE GOODS OR SERVICES; LOSS
   OF USE, DATA, OR  PROFITS; OR BUSINESS  INTERRUPTION)  HOWEVER CAUSED AND ON
   ANY  THEORY OF LIABILITY,  WHETHER  IN CONTRACT,  STRICT LIABILITY,  OR TORT
   (INCLUDING  NEGLIGENCE OR  OTHERWISE) ARISING IN  ANY WAY OUT OF THE  USE OF
   THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  
   This software  consists of voluntary contributions made  by many individuals
   on  behalf of the Apache Software  Foundation. For more  information on the
   Apache Software Foundation, please see <http://www.apache.org/>.
  
  */
  package org.apache.excalibur.event.impl;
  
  import org.apache.excalibur.event.EnqueuePredicate;
  import org.apache.excalibur.event.Sink;
  
  /**
   * This enqueue predicate implements input rate policing.
   *
   * @version $Revision: 1.1 $
   * @author  <a href="mailto:[EMAIL PROTECTED]">schierma</a>
   */
  public class RateLimitingPredicate implements EnqueuePredicate
  {
      /** The rate to which the enqueuing should be limited */
      private double m_targetRate;
  
      /** The depth of the token bucket */
      private int m_depth;
  
      private int m_tokenCount;
      private long m_lastTime;
      private double m_regenTimeMs;
  
      /** Number of milliseconds between regenerations */
      private static final long MIN_REGENERATION_TIME = 0;
  
      //------------------------- RateLimitingPredicate constructors
      /**
       * Create a new RateLimitingPredicate for the given sink,
       * bucket depth and no rate limit.
       * @since May 15, 2002
       *
       * @param depth
       *  The token bucket depth.
       */
      public RateLimitingPredicate(int depth)
      {
          this(-1.0, depth);
      }
  
      /**
       * Create a new RateLimitingPredicate for the given sink,
       * targetRate, and token bucket depth.
       * A rate of <m_code>-1.0</m_code> indicates no rate limit.
       * @since May 15, 2002
       *
       * @param targetRate
       *  The rate that is the target for this predicate
       * @param depth
       *  The token bucket depth.
       */
      public RateLimitingPredicate(double targetRate, int depth)
      {
          m_targetRate = targetRate;
          m_depth = depth;
  
          m_regenTimeMs = (1.0 / targetRate) * 1.0e3;
          if (m_regenTimeMs < 1)
          {
              m_regenTimeMs = 1;
          }
  
          m_tokenCount = depth;
          m_lastTime = System.currentTimeMillis();
      }
  
      //------------------------- EnqueuePredicate implementation
      /**
       * @see EnqueuePredicate#accept(Object, Sink)
       */
      public boolean accept(Object element, Sink sink)
      {
          if (m_targetRate == -1.0)
          {
              return true;
          }
  
          // First regenerate tokens
          long currentTime = System.currentTimeMillis();
          long delay = currentTime - m_lastTime;
  
          if (delay >= MIN_REGENERATION_TIME)
          {
              double numTokens = ((double) delay * 1.0) / (m_regenTimeMs * 1.0);
              m_tokenCount += numTokens;
  
              if (m_tokenCount > m_depth)
              {
                  m_tokenCount = m_depth;
              }
  
              m_lastTime = currentTime;
          }
  
          if (m_tokenCount >= 1)
          {
              m_tokenCount--;
              return true;
          }
          else
          {
              return false;
          }
      }
  
      /**
       * @see EnqueuePredicate#accept(Object, Sink)
       */
      public boolean accept(Object[] elements, Sink sink)
      {
          if (m_targetRate == -1.0)
          {
              return true;
          }
  
          // First regenerate tokens
          long currentTime = System.currentTimeMillis();
          long delay = currentTime - m_lastTime;
  
          if (delay >= MIN_REGENERATION_TIME)
          {
              double numTokens = ((double) delay * 1.0) / (m_regenTimeMs * 1.0);
              m_tokenCount += numTokens;
  
              if (m_tokenCount > m_depth)
              {
                  m_tokenCount = m_depth;
              }
              m_lastTime = currentTime;
          }
  
          if (m_tokenCount >= elements.length)
          {
              m_tokenCount -= elements.length;
              return true;
          }
          else
          {
              return false;
          }
      }
  
      //------------------------- RateLimitingPredicate specific implementation
      /**
       * Returns the current rate limit.
       * @since May 15, 2002
       *
       * @return double
       *  the current target rate
       */
      public double getTargetRate()
      {
          return m_targetRate;
      }
  
      /**
       * Returns the current depth.
       * @since May 15, 2002
       *
       * @return int
       *  The current bucket depth.
       */
      public int getDepth()
      {
          return m_depth;
      }
  
      /**
       * Returns the number of tokens currently in the bucket.
       * @since May 15, 2002
       *
       * @return int
       *  the number of tokens currently in the bucket.
       */
      public int getBucketSize()
      {
          return (int) m_tokenCount;
      }
  
      /**
       * Allows to set the rate limit. A limit of <m_code>-1.0</m_code>
       * indicates no rate limit.
       * @since May 15, 2002
       *
       * @param targetRate
       *  the current rate limit.
       */
      public void setTargetRate(double targetRate)
      {
          m_targetRate = targetRate;
  
          m_regenTimeMs = (1.0 / targetRate) * 1.0e3;
          if (m_regenTimeMs < 1)
          {
              m_regenTimeMs = 1;
          }
      }
  
      /**
       * Allows to set the bucket depth.
       * @since May 15, 2002
       *
       * @param depth
       *  The bucket depth as an integer.
       */
      public void setDepth(int depth)
      {
          m_depth = depth;
      }
  
  }
  
  
  1.1                  
avalon-excalibur/event/src/java/org/apache/excalibur/event/impl/ThresholdEnqueuePredicate.java
  
  Index: ThresholdEnqueuePredicate.java
  ===================================================================
  /*
  
   ============================================================================
                     The Apache Software License, Version 1.1
   ============================================================================
  
   Copyright (C) @year@ The Apache Software Foundation. All rights reserved.
  
   Redistribution and use in source and binary forms, with or without modifica-
   tion, are permitted provided that the following conditions are met:
  
   1. Redistributions of  source code must  retain the above copyright  notice,
      this list of conditions and the following disclaimer.
  
   2. Redistributions in binary form must reproduce the above copyright notice,
      this list of conditions and the following disclaimer in the documentation
      and/or other materials provided with the distribution.
  
   3. The end-user documentation included with the redistribution, if any, must
      include  the following  acknowledgment:  "This product includes  software
      developed  by the  Apache Software Foundation  (http://www.apache.org/)."
      Alternately, this  acknowledgment may  appear in the software itself,  if
      and wherever such third-party acknowledgments normally appear.
  
   4. The names "Jakarta", "Avalon", "Excalibur" and "Apache Software 
Foundation"
      must not be used to endorse or promote products derived from this  
software
      without  prior written permission. For written permission, please contact
      [EMAIL PROTECTED]
  
   5. Products  derived from this software may not  be called "Apache", nor may
      "Apache" appear  in their name,  without prior written permission  of the
      Apache Software Foundation.
  
   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
   INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS  FOR A PARTICULAR  PURPOSE ARE  DISCLAIMED.  IN NO  EVENT SHALL  THE
   APACHE SOFTWARE  FOUNDATION  OR ITS CONTRIBUTORS  BE LIABLE FOR  ANY DIRECT,
   INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR CONSEQUENTIAL  DAMAGES (INCLU-
   DING, BUT NOT LIMITED TO, PROCUREMENT  OF SUBSTITUTE GOODS OR SERVICES; LOSS
   OF USE, DATA, OR  PROFITS; OR BUSINESS  INTERRUPTION)  HOWEVER CAUSED AND ON
   ANY  THEORY OF LIABILITY,  WHETHER  IN CONTRACT,  STRICT LIABILITY,  OR TORT
   (INCLUDING  NEGLIGENCE OR  OTHERWISE) ARISING IN  ANY WAY OUT OF THE  USE OF
   THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  
   This software  consists of voluntary contributions made  by many individuals
   on  behalf of the Apache Software  Foundation. For more  information on the
   Apache Software Foundation, please see <http://www.apache.org/>.
  
  */
  package org.apache.excalibur.event.impl;
  
  import org.apache.excalibur.event.EnqueuePredicate;
  import org.apache.excalibur.event.Sink;
  
  /**
   * The ThresholdEnqueuePredicate limits the elements that can be enqueued
   * based on the size of the Queue.
   */
  public final class ThresholdEnqueuePredicate implements EnqueuePredicate
  {
      private final int m_threshold;
  
      /**
       * Create a new ThresholdEnqueuePredicate with the supplied limit.
       *
       * @param limit  A number greater than zero
       */
      public ThresholdEnqueuePredicate(int limit)
      {
          m_threshold = limit;
      }
  
      /**
       * Returns true if the Sink size + 1 (the element) is less than the
       * threshold.
       */
      public boolean accept(Object element, Sink modifyingSink)
      {
          if ( m_threshold <=0 ) return true;
  
          return (modifyingSink.size() + 1) < m_threshold;
      }
  
      /**
       * Returns true if the Sink size + the number of elements is less than
       * the threshold.
       */
      public boolean accept(Object[] elements, Sink modifyingSink)
      {
          if ( m_threshold <=0 ) return true;
  
          return (modifyingSink.size() + elements.length) < m_threshold;
      }
  }
  
  
  
  1.8       +2 -2      
avalon-excalibur/event/src/test/org/apache/excalibur/event/test/AbstractQueueTestCase.java
  
  Index: AbstractQueueTestCase.java
  ===================================================================
  RCS file: 
/home/cvs/avalon-excalibur/event/src/test/org/apache/excalibur/event/test/AbstractQueueTestCase.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- AbstractQueueTestCase.java        3 Sep 2002 17:10:56 -0000       1.7
  +++ AbstractQueueTestCase.java        11 Feb 2003 04:40:27 -0000      1.8
  @@ -169,12 +169,12 @@
           assertEquals( 0, queue.size() );
   
           PreparedEnqueue prep = queue.prepareEnqueue( elements );
  -        assertEquals( 0, queue.size() );
  +        assertEquals( 10, queue.size() );
           prep.abort();
           assertEquals( 0, queue.size() );
   
           prep = queue.prepareEnqueue( elements );
  -        assertEquals( 0, queue.size() );
  +        assertEquals( 10, queue.size() );
           prep.commit();
           assertEquals( 10, queue.size() );
   
  
  
  
  1.11      +7 -1      
avalon-excalibur/event/src/test/org/apache/excalibur/event/test/FixedSizeQueueTestCase.java
  
  Index: FixedSizeQueueTestCase.java
  ===================================================================
  RCS file: 
/home/cvs/avalon-excalibur/event/src/test/org/apache/excalibur/event/test/FixedSizeQueueTestCase.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- FixedSizeQueueTestCase.java       3 Sep 2002 17:43:01 -0000       1.10
  +++ FixedSizeQueueTestCase.java       11 Feb 2003 04:40:27 -0000      1.11
  @@ -49,7 +49,7 @@
   */
   package org.apache.excalibur.event.test;
   
  -import org.apache.excalibur.event.impl.FixedSizeQueue;
  +import org.apache.excalibur.event.impl.*;
   
   /**
    * The default queue implementation is a variabl size queue.
  @@ -67,5 +67,11 @@
           throws Exception
       {
           this.performQueue( new FixedSizeQueue( 32 ) );
  +    }
  +
  +    public void testThresholdDefaultQueue()
  +        throws Exception
  +    {
  +        this.performQueue( new DefaultQueue( new ThresholdEnqueuePredicate( 
32 ) ) );
       }
   }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to