Author: davsclaus
Date: Sun Apr 22 08:37:16 2012
New Revision: 1328814

URL: http://svn.apache.org/viewvc?rev=1328814&view=rev
Log:
CAMEL-5202: Added option eagerLimitMaxMessagesPerPoll to file/ftp endpoints.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
      - copied, changed from r1328800, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
    
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
      - copied, changed from r1328800, 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=1328814&r1=1328813&r2=1328814&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
 Sun Apr 22 08:37:16 2012
@@ -84,6 +84,7 @@ public class FileEndpoint extends Generi
 
         // set max messages per poll
         result.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+        result.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
 
         configureConsumer(result);
         return result;

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1328814&r1=1328813&r2=1328814&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 Sun Apr 22 08:37:16 2012
@@ -18,6 +18,7 @@ package org.apache.camel.component.file;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -46,6 +47,7 @@ public abstract class GenericFileConsume
     protected volatile ShutdownRunningTask shutdownRunningTask;
     protected volatile int pendingExchanges;
     protected Processor customProcessor;
+    protected boolean eagerLimitMaxMessagesPerPoll = true;
 
     public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor 
processor, GenericFileOperations<T> operations) {
         super(endpoint, processor);
@@ -72,9 +74,18 @@ public abstract class GenericFileConsume
         this.customProcessor = processor;
     }
 
+    public boolean isEagerLimitMaxMessagesPerPoll() {
+        return eagerLimitMaxMessagesPerPoll;
+    }
+
+    public void setEagerLimitMaxMessagesPerPoll(boolean 
eagerLimitMaxMessagesPerPoll) {
+        this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
+    }
+
     /**
      * Poll for files
      */
+    @SuppressWarnings("unchecked")
     protected int poll() throws Exception {
         // must reset for each poll
         fileExpressionResult = null;
@@ -111,6 +122,7 @@ public abstract class GenericFileConsume
         }
 
         // sort using build in sorters so we can use expressions
+        // use a linked list so we can deque the exchanges
         LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
         for (GenericFile<T> file : files) {
             Exchange exchange = endpoint.createExchange(file);
@@ -123,13 +135,24 @@ public abstract class GenericFileConsume
             Collections.sort(exchanges, endpoint.getSortBy());
         }
 
+        // use a queue for the exchanges
+        Deque<Exchange> q = exchanges;
+
+        // we are not eager limiting, but we have configured a limit, so cut 
the list of files
+        if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
+            if (files.size() > maxMessagesPerPoll) {
+                log.debug("Limiting maximum messages to poll at {} files as 
there was more messages in this poll.", maxMessagesPerPoll);
+                // must first remove excessive files from the in progress 
repository
+                removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
+            }
+        }
+
         // consume files one by one
         int total = exchanges.size();
         if (total > 0) {
             log.debug("Total {} files to consume", total);
         }
 
-        Queue<Exchange> q = exchanges;
         int polledMessages = processBatch(CastUtils.cast(q));
 
         postPollCheck();
@@ -137,7 +160,6 @@ public abstract class GenericFileConsume
         return polledMessages;
     }
 
-
     @SuppressWarnings("unchecked")
     public int processBatch(Queue<Object> exchanges) {
         int total = exchanges.size();
@@ -170,15 +192,22 @@ public abstract class GenericFileConsume
             }
         }
 
+        // drain any in progress files as we are done with this batch
+        removeExcessiveInProgressFiles((Deque) exchanges, 0);
+
+        return total;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void removeExcessiveInProgressFiles(Deque exchanges, int limit) {
         // remove the file from the in progress list in case the batch was 
limited by max messages per poll
-        while (exchanges.size() > 0) {
-            Exchange exchange = (Exchange) exchanges.poll();
+        while (exchanges.size() > limit) {
+            // must remove last
+            Exchange exchange = (Exchange) exchanges.removeLast();
             GenericFile<T> file = (GenericFile<T>) 
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
             String key = file.getAbsoluteFilePath();
             endpoint.getInProgressRepository().remove(key);
         }
-
-        return total;
     }
 
 
@@ -189,6 +218,11 @@ public abstract class GenericFileConsume
      * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting 
maxMessagesPerPoll limit
      */
     public boolean canPollMoreFiles(List<?> fileList) {
+        // at this point we should not limit if we are not eager
+        if (!eagerLimitMaxMessagesPerPoll) {
+            return true;
+        }
+
         if (maxMessagesPerPoll <= 0) {
             // no limitation
             return true;

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1328814&r1=1328813&r2=1328814&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 Sun Apr 22 08:37:16 2012
@@ -71,6 +71,7 @@ public abstract class GenericFileEndpoin
     protected boolean delete;
     protected boolean flatten;
     protected int maxMessagesPerPoll;
+    protected boolean eagerMaxMessagesPerPoll = true;
     protected int maxDepth = Integer.MAX_VALUE;
     protected int minDepth;
     protected String tempPrefix;
@@ -575,6 +576,14 @@ public abstract class GenericFileEndpoin
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
 
+    public boolean isEagerMaxMessagesPerPoll() {
+        return eagerMaxMessagesPerPoll;
+    }
+
+    public void setEagerMaxMessagesPerPoll(boolean eagerMaxMessagesPerPoll) {
+        this.eagerMaxMessagesPerPoll = eagerMaxMessagesPerPoll;
+    }
+
     public int getMaxDepth() {
         return maxDepth;
     }
@@ -584,7 +593,6 @@ public abstract class GenericFileEndpoin
     }
 
     public int getMinDepth() {
-
         return minDepth;
     }
 

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
 (from r1328800, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java&r1=1328800&r2=1328814&rev=1328814&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
 Sun Apr 22 08:37:16 2012
@@ -24,29 +24,31 @@ import org.apache.camel.component.mock.M
 /**
  * Unit test for max messages per poll
  */
-public class FileConsumeMaxMessagesPerPollTest extends ContextTestSupport {
+public class FileConsumeNotEagerMaxMessagesPerPollTest extends 
ContextTestSupport {
 
-    private String fileUrl = 
"file://target/poll/?initialDelay=2000&delay=5000&maxMessagesPerPoll=2";
+    // sort by name and not eager, then we should pickup the files in order
+    private String fileUrl = 
"file://target/poll/?initialDelay=2000&delay=5000&"
+            + 
"maxMessagesPerPoll=2&eagerMaxMessagesPerPoll=false&sortBy=file:name";
 
     @Override
     protected void setUp() throws Exception {
         deleteDirectory("target/poll");
         super.setUp();
-        template.sendBodyAndHeader(fileUrl, "Bye World", Exchange.FILE_NAME, 
"bye.txt");
-        template.sendBodyAndHeader(fileUrl, "Hello World", Exchange.FILE_NAME, 
"hello.txt");
-        template.sendBodyAndHeader(fileUrl, "Godday World", 
Exchange.FILE_NAME, "godday.txt");
+        template.sendBodyAndHeader(fileUrl, "CCC", Exchange.FILE_NAME, 
"ccc.txt");
+        template.sendBodyAndHeader(fileUrl, "AAA", Exchange.FILE_NAME, 
"aaa.txt");
+        template.sendBodyAndHeader(fileUrl, "BBB", Exchange.FILE_NAME, 
"bbb.txt");
     }
 
     public void testMaxMessagesPerPoll() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(2);
+        mock.expectedBodiesReceived("AAA", "BBB");
         mock.setResultWaitTime(3000);
         mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
 
         assertMockEndpointsSatisfied();
 
         mock.reset();
-        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived("CCC");
         mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 1);
 
         assertMockEndpointsSatisfied();

Modified: 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=1328814&r1=1328813&r2=1328814&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
 Sun Apr 22 08:37:16 2012
@@ -92,6 +92,7 @@ public abstract class RemoteFileEndpoint
 
         // set max messages per poll
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+        consumer.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
 
         configureConsumer(consumer);
         return consumer;

Copied: 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
 (from r1328800, 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java&r1=1328800&r2=1328814&rev=1328814&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java
 (original)
+++ 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
 Sun Apr 22 08:37:16 2012
@@ -25,10 +25,11 @@ import org.junit.Test;
 /**
  * @version 
  */
-public class FtpConsumerMaxMessagesPerPollTest extends FtpServerTestSupport {
+public class FtpConsumerNotEagerMaxMessagesPerPollTest extends 
FtpServerTestSupport {
 
     private String getFtpUrl() {
-        return "ftp://admin@localhost:"; + getPort() + 
"/poll/?password=admin&delay=6000&delete=true&sortBy=file:name&maxMessagesPerPoll=2";
+        return "ftp://admin@localhost:"; + getPort() + 
"/poll/?password=admin&delay=6000&delete=true"
+                + 
"&sortBy=file:name&maxMessagesPerPoll=2&eagerMaxMessagesPerPoll=false";
     }
 
     @Override
@@ -44,23 +45,23 @@ public class FtpConsumerMaxMessagesPerPo
         context.startRoute("foo");
 
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Bye World", "Godday World");
+        mock.expectedBodiesReceived("AAA", "BBB");
         mock.setResultWaitTime(4000);
         mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
 
         assertMockEndpointsSatisfied();
 
         mock.reset();
-        mock.expectedBodiesReceived("Hello World");
+        mock.expectedBodiesReceived("CCC");
         mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 1);
 
         assertMockEndpointsSatisfied();
     }
     
     private void prepareFtpServer() throws Exception {
-        sendFile(getFtpUrl(), "Bye World", "bye.txt");
-        sendFile(getFtpUrl(), "Hello World", "hello.txt");
-        sendFile(getFtpUrl(), "Godday World", "godday.txt");
+        sendFile(getFtpUrl(), "CCC", "ccc.txt");
+        sendFile(getFtpUrl(), "AAA", "aaa.txt");
+        sendFile(getFtpUrl(), "BBB", "bbb.txt");
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {


Reply via email to