Author: davsclaus
Date: Sun Apr 22 08:37:16 2012
New Revision: 1328814
URL: http://svn.apache.org/viewvc?rev=1328814&view=rev
Log:
CAMEL-5202: Added option eagerLimitMaxMessagesPerPoll to file/ftp endpoints.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
- copied, changed from r1328800,
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
- copied, changed from r1328800,
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=1328814&r1=1328813&r2=1328814&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
Sun Apr 22 08:37:16 2012
@@ -84,6 +84,7 @@ public class FileEndpoint extends Generi
// set max messages per poll
result.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+ result.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
configureConsumer(result);
return result;
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1328814&r1=1328813&r2=1328814&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Sun Apr 22 08:37:16 2012
@@ -18,6 +18,7 @@ package org.apache.camel.component.file;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@@ -46,6 +47,7 @@ public abstract class GenericFileConsume
protected volatile ShutdownRunningTask shutdownRunningTask;
protected volatile int pendingExchanges;
protected Processor customProcessor;
+ protected boolean eagerLimitMaxMessagesPerPoll = true;
public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor
processor, GenericFileOperations<T> operations) {
super(endpoint, processor);
@@ -72,9 +74,18 @@ public abstract class GenericFileConsume
this.customProcessor = processor;
}
+ public boolean isEagerLimitMaxMessagesPerPoll() {
+ return eagerLimitMaxMessagesPerPoll;
+ }
+
+ public void setEagerLimitMaxMessagesPerPoll(boolean
eagerLimitMaxMessagesPerPoll) {
+ this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
+ }
+
/**
* Poll for files
*/
+ @SuppressWarnings("unchecked")
protected int poll() throws Exception {
// must reset for each poll
fileExpressionResult = null;
@@ -111,6 +122,7 @@ public abstract class GenericFileConsume
}
// sort using build in sorters so we can use expressions
+ // use a linked list so we can deque the exchanges
LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
for (GenericFile<T> file : files) {
Exchange exchange = endpoint.createExchange(file);
@@ -123,13 +135,24 @@ public abstract class GenericFileConsume
Collections.sort(exchanges, endpoint.getSortBy());
}
+ // use a queue for the exchanges
+ Deque<Exchange> q = exchanges;
+
+ // we are not eager limiting, but we have configured a limit, so cut
the list of files
+ if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
+ if (files.size() > maxMessagesPerPoll) {
+ log.debug("Limiting maximum messages to poll at {} files as
there was more messages in this poll.", maxMessagesPerPoll);
+ // must first remove excessive files from the in progress
repository
+ removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
+ }
+ }
+
// consume files one by one
int total = exchanges.size();
if (total > 0) {
log.debug("Total {} files to consume", total);
}
- Queue<Exchange> q = exchanges;
int polledMessages = processBatch(CastUtils.cast(q));
postPollCheck();
@@ -137,7 +160,6 @@ public abstract class GenericFileConsume
return polledMessages;
}
-
@SuppressWarnings("unchecked")
public int processBatch(Queue<Object> exchanges) {
int total = exchanges.size();
@@ -170,15 +192,22 @@ public abstract class GenericFileConsume
}
}
+ // drain any in progress files as we are done with this batch
+ removeExcessiveInProgressFiles((Deque) exchanges, 0);
+
+ return total;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void removeExcessiveInProgressFiles(Deque exchanges, int limit) {
// remove the file from the in progress list in case the batch was
limited by max messages per poll
- while (exchanges.size() > 0) {
- Exchange exchange = (Exchange) exchanges.poll();
+ while (exchanges.size() > limit) {
+ // must remove last
+ Exchange exchange = (Exchange) exchanges.removeLast();
GenericFile<T> file = (GenericFile<T>)
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
String key = file.getAbsoluteFilePath();
endpoint.getInProgressRepository().remove(key);
}
-
- return total;
}
@@ -189,6 +218,11 @@ public abstract class GenericFileConsume
* @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting
maxMessagesPerPoll limit
*/
public boolean canPollMoreFiles(List<?> fileList) {
+ // at this point we should not limit if we are not eager
+ if (!eagerLimitMaxMessagesPerPoll) {
+ return true;
+ }
+
if (maxMessagesPerPoll <= 0) {
// no limitation
return true;
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1328814&r1=1328813&r2=1328814&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
Sun Apr 22 08:37:16 2012
@@ -71,6 +71,7 @@ public abstract class GenericFileEndpoin
protected boolean delete;
protected boolean flatten;
protected int maxMessagesPerPoll;
+ protected boolean eagerMaxMessagesPerPoll = true;
protected int maxDepth = Integer.MAX_VALUE;
protected int minDepth;
protected String tempPrefix;
@@ -575,6 +576,14 @@ public abstract class GenericFileEndpoin
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
+ public boolean isEagerMaxMessagesPerPoll() {
+ return eagerMaxMessagesPerPoll;
+ }
+
+ public void setEagerMaxMessagesPerPoll(boolean eagerMaxMessagesPerPoll) {
+ this.eagerMaxMessagesPerPoll = eagerMaxMessagesPerPoll;
+ }
+
public int getMaxDepth() {
return maxDepth;
}
@@ -584,7 +593,6 @@ public abstract class GenericFileEndpoin
}
public int getMinDepth() {
-
return minDepth;
}
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
(from r1328800,
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java&r1=1328800&r2=1328814&rev=1328814&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
Sun Apr 22 08:37:16 2012
@@ -24,29 +24,31 @@ import org.apache.camel.component.mock.M
/**
* Unit test for max messages per poll
*/
-public class FileConsumeMaxMessagesPerPollTest extends ContextTestSupport {
+public class FileConsumeNotEagerMaxMessagesPerPollTest extends
ContextTestSupport {
- private String fileUrl =
"file://target/poll/?initialDelay=2000&delay=5000&maxMessagesPerPoll=2";
+ // sort by name and not eager, then we should pickup the files in order
+ private String fileUrl =
"file://target/poll/?initialDelay=2000&delay=5000&"
+ +
"maxMessagesPerPoll=2&eagerMaxMessagesPerPoll=false&sortBy=file:name";
@Override
protected void setUp() throws Exception {
deleteDirectory("target/poll");
super.setUp();
- template.sendBodyAndHeader(fileUrl, "Bye World", Exchange.FILE_NAME,
"bye.txt");
- template.sendBodyAndHeader(fileUrl, "Hello World", Exchange.FILE_NAME,
"hello.txt");
- template.sendBodyAndHeader(fileUrl, "Godday World",
Exchange.FILE_NAME, "godday.txt");
+ template.sendBodyAndHeader(fileUrl, "CCC", Exchange.FILE_NAME,
"ccc.txt");
+ template.sendBodyAndHeader(fileUrl, "AAA", Exchange.FILE_NAME,
"aaa.txt");
+ template.sendBodyAndHeader(fileUrl, "BBB", Exchange.FILE_NAME,
"bbb.txt");
}
public void testMaxMessagesPerPoll() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(2);
+ mock.expectedBodiesReceived("AAA", "BBB");
mock.setResultWaitTime(3000);
mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
assertMockEndpointsSatisfied();
mock.reset();
- mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived("CCC");
mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 1);
assertMockEndpointsSatisfied();
Modified:
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=1328814&r1=1328813&r2=1328814&view=diff
==============================================================================
---
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
(original)
+++
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
Sun Apr 22 08:37:16 2012
@@ -92,6 +92,7 @@ public abstract class RemoteFileEndpoint
// set max messages per poll
consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+ consumer.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
configureConsumer(consumer);
return consumer;
Copied:
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
(from r1328800,
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java&r1=1328800&r2=1328814&rev=1328814&view=diff
==============================================================================
---
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java
(original)
+++
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
Sun Apr 22 08:37:16 2012
@@ -25,10 +25,11 @@ import org.junit.Test;
/**
* @version
*/
-public class FtpConsumerMaxMessagesPerPollTest extends FtpServerTestSupport {
+public class FtpConsumerNotEagerMaxMessagesPerPollTest extends
FtpServerTestSupport {
private String getFtpUrl() {
- return "ftp://admin@localhost:" + getPort() +
"/poll/?password=admin&delay=6000&delete=true&sortBy=file:name&maxMessagesPerPoll=2";
+ return "ftp://admin@localhost:" + getPort() +
"/poll/?password=admin&delay=6000&delete=true"
+ +
"&sortBy=file:name&maxMessagesPerPoll=2&eagerMaxMessagesPerPoll=false";
}
@Override
@@ -44,23 +45,23 @@ public class FtpConsumerMaxMessagesPerPo
context.startRoute("foo");
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Bye World", "Godday World");
+ mock.expectedBodiesReceived("AAA", "BBB");
mock.setResultWaitTime(4000);
mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
assertMockEndpointsSatisfied();
mock.reset();
- mock.expectedBodiesReceived("Hello World");
+ mock.expectedBodiesReceived("CCC");
mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 1);
assertMockEndpointsSatisfied();
}
private void prepareFtpServer() throws Exception {
- sendFile(getFtpUrl(), "Bye World", "bye.txt");
- sendFile(getFtpUrl(), "Hello World", "hello.txt");
- sendFile(getFtpUrl(), "Godday World", "godday.txt");
+ sendFile(getFtpUrl(), "CCC", "ccc.txt");
+ sendFile(getFtpUrl(), "AAA", "aaa.txt");
+ sendFile(getFtpUrl(), "BBB", "bbb.txt");
}
protected RouteBuilder createRouteBuilder() throws Exception {