Author: davsclaus
Date: Sun Apr 22 08:43:02 2012
New Revision: 1328817
URL: http://svn.apache.org/viewvc?rev=1328817&view=rev
Log:
CAMEL-5202: Added option eagerLimitMaxMessagesPerPoll to file/ftp endpoints.
Added:
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
- copied unchanged from r1328814,
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeNotEagerMaxMessagesPerPollTest.java
camel/branches/camel-2.9.x/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
- copied unchanged from r1328814,
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerNotEagerMaxMessagesPerPollTest.java
Modified:
camel/branches/camel-2.9.x/ (props changed)
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1328814
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
(original)
+++
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
Sun Apr 22 08:43:02 2012
@@ -84,6 +84,7 @@ public class FileEndpoint extends Generi
// set max messages per poll
result.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+ result.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
configureConsumer(result);
return result;
Modified:
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1328817&r1=1328816&r2=1328817&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
(original)
+++
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Sun Apr 22 08:43:02 2012
@@ -18,6 +18,7 @@ package org.apache.camel.component.file;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@@ -49,6 +50,7 @@ public abstract class GenericFileConsume
protected volatile ShutdownRunningTask shutdownRunningTask;
protected volatile int pendingExchanges;
protected Processor customProcessor;
+ protected boolean eagerLimitMaxMessagesPerPoll = true;
public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor
processor, GenericFileOperations<T> operations) {
super(endpoint, processor);
@@ -75,9 +77,18 @@ public abstract class GenericFileConsume
this.customProcessor = processor;
}
+ public boolean isEagerLimitMaxMessagesPerPoll() {
+ return eagerLimitMaxMessagesPerPoll;
+ }
+
+ public void setEagerLimitMaxMessagesPerPoll(boolean
eagerLimitMaxMessagesPerPoll) {
+ this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
+ }
+
/**
* Poll for files
*/
+ @SuppressWarnings("unchecked")
protected int poll() throws Exception {
// must reset for each poll
fileExpressionResult = null;
@@ -114,6 +125,7 @@ public abstract class GenericFileConsume
}
// sort using build in sorters so we can use expressions
+ // use a linked list so we can deque the exchanges
LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
for (GenericFile<T> file : files) {
Exchange exchange = endpoint.createExchange(file);
@@ -126,13 +138,24 @@ public abstract class GenericFileConsume
Collections.sort(exchanges, endpoint.getSortBy());
}
+ // use a queue for the exchanges
+ Deque<Exchange> q = exchanges;
+
+ // we are not eager limiting, but we have configured a limit, so cut
the list of files
+ if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
+ if (files.size() > maxMessagesPerPoll) {
+ log.debug("Limiting maximum messages to poll at {} files as
there was more messages in this poll.", maxMessagesPerPoll);
+ // must first remove excessive files from the in progress
repository
+ removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
+ }
+ }
+
// consume files one by one
int total = exchanges.size();
if (total > 0) {
log.debug("Total {} files to consume", total);
}
- Queue<Exchange> q = exchanges;
int polledMessages = processBatch(CastUtils.cast(q));
postPollCheck();
@@ -176,15 +199,22 @@ public abstract class GenericFileConsume
}
}
+ // drain any in progress files as we are done with this batch
+ removeExcessiveInProgressFiles((Deque) exchanges, 0);
+
+ return total;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void removeExcessiveInProgressFiles(Deque exchanges, int limit) {
// remove the file from the in progress list in case the batch was
limited by max messages per poll
- while (exchanges.size() > 0) {
- Exchange exchange = (Exchange) exchanges.poll();
+ while (exchanges.size() > limit) {
+ // must remove last
+ Exchange exchange = (Exchange) exchanges.removeLast();
GenericFile<T> file = (GenericFile<T>)
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
String key = file.getAbsoluteFilePath();
endpoint.getInProgressRepository().remove(key);
}
-
- return total;
}
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
@@ -242,6 +272,11 @@ public abstract class GenericFileConsume
* @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting
maxMessagesPerPoll limit
*/
public boolean canPollMoreFiles(List<?> fileList) {
+ // at this point we should not limit if we are not eager
+ if (!eagerLimitMaxMessagesPerPoll) {
+ return true;
+ }
+
if (maxMessagesPerPoll <= 0) {
// no limitation
return true;
Modified:
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
(original)
+++
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
Sun Apr 22 08:43:02 2012
@@ -71,6 +71,7 @@ public abstract class GenericFileEndpoin
protected boolean delete;
protected boolean flatten;
protected int maxMessagesPerPoll;
+ protected boolean eagerMaxMessagesPerPoll = true;
protected int maxDepth = Integer.MAX_VALUE;
protected int minDepth;
protected String tempPrefix;
@@ -556,6 +557,14 @@ public abstract class GenericFileEndpoin
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
+ public boolean isEagerMaxMessagesPerPoll() {
+ return eagerMaxMessagesPerPoll;
+ }
+
+ public void setEagerMaxMessagesPerPoll(boolean eagerMaxMessagesPerPoll) {
+ this.eagerMaxMessagesPerPoll = eagerMaxMessagesPerPoll;
+ }
+
public int getMaxDepth() {
return maxDepth;
}
@@ -565,7 +574,6 @@ public abstract class GenericFileEndpoin
}
public int getMinDepth() {
-
return minDepth;
}
Modified:
camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=1328817&r1=1328816&r2=1328817&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
(original)
+++
camel/branches/camel-2.9.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
Sun Apr 22 08:43:02 2012
@@ -92,6 +92,7 @@ public abstract class RemoteFileEndpoint
// set max messages per poll
consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+ consumer.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
configureConsumer(consumer);
return consumer;