Author: ccustine
Date: Thu Feb 19 04:31:09 2009
New Revision: 745722

URL: http://svn.apache.org/viewvc?rev=745722&view=rev
Log:
SMXCOMP-52 smx-file async FilePollerEndpoint needs a throttling mechanism to 
avoid creating excessive numbers of open exchanges and overloading the nmr

Modified:
    
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java

Modified: 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL: 
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=745722&r1=745721&r2=745722&view=diff
==============================================================================
--- 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
 (original)
+++ 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
 Thu Feb 19 04:31:09 2009
@@ -24,6 +24,7 @@
 import java.io.InputStream;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 
 import javax.jbi.JBIException;
@@ -67,6 +68,9 @@
     private FileMarshaler marshaler = new DefaultFileMarshaler();
     private LockManager lockManager;
     private ConcurrentMap<String, InputStream> openExchanges = new 
ConcurrentHashMap<String, InputStream>();
+    private int maxConcurrent = -1;
+    private Object monitor = new Object();
+    private AtomicLong throttleCounter = new AtomicLong(0);
 
     public FilePollerEndpoint() {
     }
@@ -92,7 +96,12 @@
     }
 
     public void poll() throws Exception {
-        pollFileOrDirectory(file);
+        if (!this.isThrottled()) {
+            pollFileOrDirectory(file);
+        } else {
+            if (logger.isDebugEnabled())
+                logger.info("Poller is throttled, skipping this cycle");
+        }
     }
 
     public void validate() throws DeploymentException {
@@ -127,6 +136,20 @@
     // 
-------------------------------------------------------------------------
 
     /**
+     * How many open exchanges can be pending.  Default is -1 for unbounded 
pending exchanges.
+     * Set to 1...n to engage throttling of polled file processing.
+     * 
+     * @param maxConcurrent
+     */
+    public void setMaxConcurrent(int maxConcurrent) {
+        this.maxConcurrent = maxConcurrent;
+    }
+
+    public int getMaxConcurrent() {
+        return maxConcurrent;
+    }
+
+    /**
      * Specifies the file or directory to be polled. If it is a directory, all
      * files in the directory or its sub-directories will be processed by the
      * endpoint. If it is a file, only files matching the filename will be
@@ -286,6 +309,9 @@
             // skip the file because it is not yet fully copied over
             return;
         }
+
+        checkThrottle();
+
         getExecutor().execute(new Runnable() {
             public void run() {
                 String uri = file.toURI().relativize(aFile.toURI()).toString();
@@ -301,6 +327,41 @@
         });
     }
 
+    /**
+     * Check the throttling criteria and either execute or block
+     * until pending exchanges are processed.
+     */
+    private void checkThrottle() {
+        if (maxConcurrent > 0 && this.openExchanges.size() >= maxConcurrent) {
+            throttleCounter.addAndGet(1);
+            synchronized (this.monitor) {
+                boolean interrupt = false;
+                while (this.openExchanges.size() >= this.maxConcurrent) {
+                    if (interrupt) {
+                        throw new IllegalStateException("Throttle block has 
been interrupted");
+                    }
+                    try {
+                        this.monitor.wait();
+                    }
+                    catch (InterruptedException ex) {
+                        Thread.currentThread().interrupt();
+                        interrupt = true;
+                    }
+                }
+            }
+            throttleCounter.decrementAndGet();
+        }
+    }
+
+    /**
+     * Is this endpoint currently throttling throughput.
+     *  
+     * @return
+     */
+    private boolean isThrottled() {
+        return (throttleCounter.get() > 0);
+    }
+
     protected void processFileNow(File aFile) {
         try {
             if (logger.isDebugEnabled()) {
@@ -374,6 +435,9 @@
                     throw new JBIException("Unexpectedly received an exchange 
with status ACTIVE");
                 }
             } finally {
+
+                notifyThrottledThreads();
+
                 // remove the open exchange
                 openExchanges.remove(exchange.getExchangeId());
                 // unlock the file
@@ -387,6 +451,15 @@
         }
     }
 
+    private void notifyThrottledThreads() {
+        // notify the throttled threads monitor
+        if (maxConcurrent > 0) {
+            synchronized (this.monitor) {
+                this.monitor.notifyAll();
+            }
+        }
+    }
+
     /**
      * unlock the file
      * 


Reply via email to