Author: ccustine
Date: Thu Feb 19 04:31:09 2009
New Revision: 745722
URL: http://svn.apache.org/viewvc?rev=745722&view=rev
Log:
SMXCOMP-52 smx-file async FilePollerEndpoint needs a throttling mechanism to
avoid creating excessive numbers of open exchanges and overloading the nmr
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=745722&r1=745721&r2=745722&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(original)
+++
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
Thu Feb 19 04:31:09 2009
@@ -24,6 +24,7 @@
import java.io.InputStream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import javax.jbi.JBIException;
@@ -67,6 +68,9 @@
private FileMarshaler marshaler = new DefaultFileMarshaler();
private LockManager lockManager;
private ConcurrentMap<String, InputStream> openExchanges = new
ConcurrentHashMap<String, InputStream>();
+ private int maxConcurrent = -1;
+ private Object monitor = new Object();
+ private AtomicLong throttleCounter = new AtomicLong(0);
public FilePollerEndpoint() {
}
@@ -92,7 +96,12 @@
}
public void poll() throws Exception {
- pollFileOrDirectory(file);
+ if (!this.isThrottled()) {
+ pollFileOrDirectory(file);
+ } else {
+ if (logger.isDebugEnabled())
+ logger.info("Poller is throttled, skipping this cycle");
+ }
}
public void validate() throws DeploymentException {
@@ -127,6 +136,20 @@
//
-------------------------------------------------------------------------
/**
+ * How many open exchanges can be pending. Default is -1 for unbounded
pending exchanges.
+ * Set to 1...n to engage throttling of polled file processing.
+ *
+ * @param maxConcurrent
+ */
+ public void setMaxConcurrent(int maxConcurrent) {
+ this.maxConcurrent = maxConcurrent;
+ }
+
+ public int getMaxConcurrent() {
+ return maxConcurrent;
+ }
+
+ /**
* Specifies the file or directory to be polled. If it is a directory, all
* files in the directory or its sub-directories will be processed by the
* endpoint. If it is a file, only files matching the filename will be
@@ -286,6 +309,9 @@
// skip the file because it is not yet fully copied over
return;
}
+
+ checkThrottle();
+
getExecutor().execute(new Runnable() {
public void run() {
String uri = file.toURI().relativize(aFile.toURI()).toString();
@@ -301,6 +327,41 @@
});
}
+ /**
+ * Check the throttling criteria and either execute or block
+ * until pending exchanges are processed.
+ */
+ private void checkThrottle() {
+ if (maxConcurrent > 0 && this.openExchanges.size() >= maxConcurrent) {
+ throttleCounter.addAndGet(1);
+ synchronized (this.monitor) {
+ boolean interrupt = false;
+ while (this.openExchanges.size() >= this.maxConcurrent) {
+ if (interrupt) {
+ throw new IllegalStateException("Throttle block has
been interrupted");
+ }
+ try {
+ this.monitor.wait();
+ }
+ catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ interrupt = true;
+ }
+ }
+ }
+ throttleCounter.decrementAndGet();
+ }
+ }
+
+ /**
+ * Is this endpoint currently throttling throughput.
+ *
+ * @return
+ */
+ private boolean isThrottled() {
+ return (throttleCounter.get() > 0);
+ }
+
protected void processFileNow(File aFile) {
try {
if (logger.isDebugEnabled()) {
@@ -374,6 +435,9 @@
throw new JBIException("Unexpectedly received an exchange
with status ACTIVE");
}
} finally {
+
+ notifyThrottledThreads();
+
// remove the open exchange
openExchanges.remove(exchange.getExchangeId());
// unlock the file
@@ -387,6 +451,15 @@
}
}
+ private void notifyThrottledThreads() {
+ // notify the throttled threads monitor
+ if (maxConcurrent > 0) {
+ synchronized (this.monitor) {
+ this.monitor.notifyAll();
+ }
+ }
+ }
+
/**
* unlock the file
*