SquelchBuilder
--------------

                 Key: CAMEL-101
                 URL: https://issues.apache.org/activemq/browse/CAMEL-101
             Project: Apache Camel
          Issue Type: New Feature
          Components: camel-core
            Reporter: Noah Nordrum


feel free to break out the inner class too...

builder to limit throughput on a given route.



package org.apache.camel.builder;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;

public class SquelchBuilder extends FromBuilder {
    private long minMessageSpacingInMs;


    public SquelchBuilder(FromBuilder parent, long minMessageSpacingInMs) {
        super(parent);
        this.minMessageSpacingInMs = minMessageSpacingInMs;
    }

    public SquelchProcessor createProcessor() throws Exception {
        // lets create a single processor for all child predicates
        final Processor childProcessor = super.createProcessor();
        return new SquelchProcessor(minMessageSpacingInMs, childProcessor);
    }

    class SquelchProcessor extends ServiceSupport implements Processor {
        private long minMessageSpacingInMs;
        private Processor processor;
        private long nextLetThroughTime;

        public SquelchProcessor(long minMessageSpacingInMs, Processor 
processor) {
            this.minMessageSpacingInMs = minMessageSpacingInMs;
            this.processor = processor;
        }

        public void process(Exchange exchange) throws Exception {
            final long now = System.currentTimeMillis();
            if (now < nextLetThroughTime) {
                final long sleepTime = nextLetThroughTime - now;
                Thread.sleep(sleepTime);
            }
            processor.process(exchange);
            nextLetThroughTime = System.currentTimeMillis() + 
minMessageSpacingInMs; // reset the now
        }

        protected void doStart() throws Exception {
            ServiceHelper.startServices(processor);
        }

        protected void doStop() throws Exception {
            ServiceHelper.stopServices(processor);
        }
    }
}




add this method in FromBuilder (might need to change the annotations, not sure 
how they affect things...)
    @Fluent
    public SquelchBuilder squelch(
            @FluentArg(value = "minMessageSpacingInMs", element = true)
            long minMessageSpacingInMs) {
        SquelchBuilder answer = new SquelchBuilder(this, minMessageSpacingInMs);
        addProcessBuilder(answer);
        return answer;
    }


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to