Author: gertv
Date: Fri May 22 11:17:54 2009
New Revision: 777471

URL: http://svn.apache.org/viewvc?rev=777471&view=rev
Log:
SMXCOMP-525: Adding unit testing for throttling and file append

Modified:
    
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
    
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
    
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java
    
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java

Modified: 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL: 
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=777471&r1=777470&r2=777471&view=diff
==============================================================================
--- 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
 (original)
+++ 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
 Fri May 22 11:17:54 2009
@@ -358,7 +358,7 @@
      *  
      * @return
      */
-    private boolean isThrottled() {
+    protected boolean isThrottled() {
         return (throttleCounter.get() > 0);
     }
 

Modified: 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
URL: 
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java?rev=777471&r1=777470&r2=777471&view=diff
==============================================================================
--- 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
 (original)
+++ 
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
 Fri May 22 11:17:54 2009
@@ -124,33 +124,33 @@
                 }
             }
             if (success) {
-               if (name != null && !name.equals(newFile.getName())) {
-                       if (isAppend()) {
-                               // append mode...now we need to transfer the 
file content into the original file
-                               File targetFile = new File(directory, name);
-                               BufferedInputStream bis = new 
BufferedInputStream(new FileInputStream(newFile));
-                               out = new BufferedOutputStream(new 
FileOutputStream(targetFile, append));
-                               try {
-                                       FileUtil.copyInputStream(bis, out);
-                               } catch (IOException ioex) {
-                                       logger.error("Unable to append to file 
" + targetFile.getName(), ioex);
-                               } finally {
-                                       try {
+                if (name != null && !name.equals(newFile.getName())) {
+                    if (isAppend()) {
+                        // append mode...now we need to transfer the file 
content into the original file
+                        File targetFile = new File(directory, name);
+                        BufferedInputStream bis = new BufferedInputStream(new 
FileInputStream(newFile));
+                        out = new BufferedOutputStream(new 
FileOutputStream(targetFile, append));
+                        try {
+                            FileUtil.copyInputStream(bis, out);
+                        } catch (IOException ioex) {
+                            logger.error("Unable to append to file " + 
targetFile.getName(), ioex);
+                        } finally {
+                            try {
                                 out.close();
                             } catch (IOException e) {
                                 logger.error("Caught exception while closing 
stream on error: " + e, e);
                             }
                             if (!newFile.delete()) {
-                               throw new IOException("File " + 
newFile.getName() + " could not be deleted...");          
+                                throw new IOException("File " + 
newFile.getName() + " could not be deleted...");          
                             }
-                               }                               
-                       } else {
-                               // no append mode, so just rename it
-                               if (!newFile.renameTo(new File(directory, 
name))) {
-                                       throw new IOException("File " + 
newFile.getName() + " could not be renamed to " + name);                        
                
-                               }                                       
-                       }
-               }
+                        }                              
+                    } else {
+                        // no append mode, so just rename it
+                        if (!newFile.renameTo(new File(directory, name))) {
+                            throw new IOException("File " + newFile.getName() 
+ " could not be renamed to " + name);                                          
 
+                        }                                      
+                    }
+                }
             } else {
                 // cleaning up incomplete files after things went wrong
                 if (newFile != null) {

Modified: 
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java?rev=777471&r1=777470&r2=777471&view=diff
==============================================================================
--- 
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java
 (original)
+++ 
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java
 Fri May 22 11:17:54 2009
@@ -18,12 +18,15 @@
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
@@ -37,9 +40,13 @@
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.executors.Executor;
+import org.apache.servicemix.jbi.container.JBIContainer;
 import org.apache.servicemix.tck.mock.MockExchangeFactory;
+import org.apache.servicemix.tck.mock.MockMessageExchange;
 import org.apache.servicemix.util.FileUtil;
 
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+
 public class FilePollerEndpointTest extends TestCase {
 
     private static final File DATA = new File("target/test/data");
@@ -85,6 +92,12 @@
         endpoint.setTargetService(new QName("urn:test", "service"));
         endpoint.setLockManager(new 
org.apache.servicemix.common.locks.impl.SimpleLockManager());
     }
+    
+    @Override
+    protected void tearDown() throws Exception {
+        FileUtil.deleteFile(DATA);
+        super.tearDown();
+    }
 
     public void testValidateNoFile() throws Exception {
         try {
@@ -143,9 +156,8 @@
     }
 
     public void testProcessSuccess() throws Exception {
-        createTestFile();
         endpoint.setFile(DATA);
-        File file = new File(DATA, "test-data.xml");
+        File file = createTestFile();
         endpoint.pollFile(file);
         MessageExchange exchange = exchanges.get(0);
         exchange.setStatus(ExchangeStatus.DONE);
@@ -153,13 +165,19 @@
         assertFalse(file.exists());
     }
 
-    private void createTestFile() throws IOException {
+    private File createTestFile() throws IOException {
+        return createTestFile("test-data.xml");
+    }
+
+    private File createTestFile(String name) throws FileNotFoundException, 
IOException {
         DATA.mkdirs();
+        File testfile = new File(DATA, name);
         InputStream fis = new 
FileInputStream("target/test-classes/test-data.xml");
-        OutputStream fos = new FileOutputStream(new File(DATA, 
"test-data.xml"));
+        OutputStream fos = new FileOutputStream(testfile);
         FileUtil.copyInputStream(fis, fos);
         fis.close();
         fos.close();
+        return testfile;
     }
 
     public void testMoveFileToNonExistentDirectory() throws Exception {
@@ -173,6 +191,77 @@
             srcFile.delete();
         }
     }
+    
+    /*
+     * Test file poller endpoint throttling
+     */
+    public void testPollerThrottling() throws Exception {
+        File file = createTestFile();
+        File anotherfile = createTestFile("another-test-file.xml");
+        endpoint.setFile(DATA);
+        endpoint.setMaxConcurrent(1);
+        
+        final CountDownLatch polls = new CountDownLatch(1);
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                try {
+                    endpoint.poll();
+                    polls.countDown();
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        });
+         
+        while (exchanges.size() < 1) {
+            Thread.sleep(150);
+        }
+        assertEquals(1, exchanges.size());
+        
+        // let's wait a bit to allow throttling to kick in
+        waitForThrottling();
+        assertTrue(endpoint.isThrottled());
+        assertEquals("Background polling should still be in progress", 1, 
polls.getCount());
+        
+        // polling again now should not block another thread
+        endpoint.poll();
+        
+        // now, let's release things and make sure both files get handled
+        while (exchanges.size() > 0) {
+            MessageExchange exchange = exchanges.remove(0);
+            exchange.setStatus(ExchangeStatus.DONE);
+            endpoint.process(exchange);
+            polls.await();
+        }
+        
+        assertFalse(file.exists());
+        assertFalse(anotherfile.exists());
+    }
+    
+    public void testHandleUnknownExchange() throws Exception {
+        try {
+            endpoint.process(new MockMessageExchange() {
+                @Override
+                public String getExchangeId() {
+                    return "a-completely-bogus-exchange";
+                }
+            });
+        } catch (Exception e) {
+            fail("The endpoint should not throw exceptions for unknown 
exchanges: " + e.getMessage());
+        }
+    }
+
+    /*
+     * Let's wait for max. 750ms for the endpoint to get throttling
+     */
+    private void waitForThrottling() throws InterruptedException {
+        int count = 5;
+        while (!endpoint.isThrottled() && count > 0) {
+            Thread.sleep(150);
+            count--;
+        }
+    }
 
     @SuppressWarnings("serial")
     private static class TestException extends Exception {

Modified: 
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java?rev=777471&r1=777470&r2=777471&view=diff
==============================================================================
--- 
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java
 (original)
+++ 
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java
 Fri May 22 11:17:54 2009
@@ -21,11 +21,13 @@
 
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.xml.namespace.QName;
 
 import junit.framework.TestCase;
 
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
 import org.apache.servicemix.jbi.framework.ComponentNameSpace;
 import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
@@ -34,8 +36,8 @@
 
 public class FileSenderEndpointTest extends TestCase {
 
-       private static final File OUT_DIR = new File("target/file-test");
-       private static final File OUT_FILE = new File(OUT_DIR, 
"file-exists.tmp");
+    private static final File OUT_DIR = new File("target/file-test");
+    private static final File OUT_FILE = new File(OUT_DIR, "file-exists.tmp");
     public static final String FILE_NAME_PROPERTY = 
"org.apache.servicemix.file.name";
     public static final String FILE_PATH_PROPERTY = 
"org.apache.servicemix.file.path";
     private FileSenderEndpoint endpoint;
@@ -96,10 +98,10 @@
        endpoint.setAppend(true);
        endpoint.setOverwrite(true);
        try {
-               endpoint.validate();
-               fail("validate() should fail when isAppend and isOverwrite are 
both true.");
+           endpoint.validate();
+           fail("validate() should fail when isAppend and isOverwrite are both 
true.");
        } catch (DeploymentException de) {
-               // test succeeds
+           // test succeeds
        }
     }
 
@@ -219,6 +221,43 @@
         FileUtil.deleteFile(OUT_FILE);
     }
     
+    // Test when output file exists and append is true.
+    public final void testProcessInOnlyFileExistsAppendWithTempFileName() 
throws Exception {
+        MockExchangeFactory mef = new MockExchangeFactory();
+        MessageExchange me = mef.createInOnlyExchange();
+        me.setOperation(new QName("uri", "op"));
+        me.setProperty("myProp", "myValue");
+        NormalizedMessage msg = me.createMessage();
+        msg.setProperty(FILE_PATH_PROPERTY, OUT_FILE.getAbsolutePath());
+        msg.setProperty(FILE_NAME_PROPERTY, OUT_FILE.getName());
+        msg.setContent(new StringSource("<input>input message</input>"));
+        endpoint.setDirectory(OUT_DIR);
+        endpoint.setAutoCreateDirectory(true);
+        endpoint.validate();
+        endpoint.setMarshaler(new DefaultFileMarshaler() {
+           @Override
+            public String getTempOutputName(MessageExchange exchange, 
NormalizedMessage message) throws MessagingException {
+               return super.getOutputName(exchange, message) + ".tmp";
+            } 
+        });
+        
+        // Create the initial file for later use.
+        endpoint.processInOnly(me, msg);
+        
+        long fileLength = OUT_FILE.length();
+        
+        endpoint.setOverwrite(false);
+        endpoint.setAppend(true);
+        
+        endpoint.processInOnly(me, msg);
+        
+        assertTrue("File was not overwritten: " + OUT_FILE.getAbsolutePath(), 
OUT_FILE.length() > fileLength);
+        assertFalse("Temporary file no longer exists", new 
File(OUT_FILE.getAbsolutePath() + ".tmp").exists());
+        
+        // clean up
+        FileUtil.deleteFile(OUT_FILE);
+    }
+    
     // Test when calling processInOut - not supported.
     public final void testProcessInOutNotSupported() throws Exception {
         MockExchangeFactory mef = new MockExchangeFactory();


Reply via email to