Author: gertv
Date: Fri May 22 11:17:54 2009
New Revision: 777471
URL: http://svn.apache.org/viewvc?rev=777471&view=rev
Log:
SMXCOMP-525: Adding unit testing for throttling and file append
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=777471&r1=777470&r2=777471&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(original)
+++
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
Fri May 22 11:17:54 2009
@@ -358,7 +358,7 @@
*
* @return
*/
- private boolean isThrottled() {
+ protected boolean isThrottled() {
return (throttleCounter.get() > 0);
}
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java?rev=777471&r1=777470&r2=777471&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
(original)
+++
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
Fri May 22 11:17:54 2009
@@ -124,33 +124,33 @@
}
}
if (success) {
- if (name != null && !name.equals(newFile.getName())) {
- if (isAppend()) {
- // append mode...now we need to transfer the
file content into the original file
- File targetFile = new File(directory, name);
- BufferedInputStream bis = new
BufferedInputStream(new FileInputStream(newFile));
- out = new BufferedOutputStream(new
FileOutputStream(targetFile, append));
- try {
- FileUtil.copyInputStream(bis, out);
- } catch (IOException ioex) {
- logger.error("Unable to append to file
" + targetFile.getName(), ioex);
- } finally {
- try {
+ if (name != null && !name.equals(newFile.getName())) {
+ if (isAppend()) {
+ // append mode...now we need to transfer the file
content into the original file
+ File targetFile = new File(directory, name);
+ BufferedInputStream bis = new BufferedInputStream(new
FileInputStream(newFile));
+ out = new BufferedOutputStream(new
FileOutputStream(targetFile, append));
+ try {
+ FileUtil.copyInputStream(bis, out);
+ } catch (IOException ioex) {
+ logger.error("Unable to append to file " +
targetFile.getName(), ioex);
+ } finally {
+ try {
out.close();
} catch (IOException e) {
logger.error("Caught exception while closing
stream on error: " + e, e);
}
if (!newFile.delete()) {
- throw new IOException("File " +
newFile.getName() + " could not be deleted...");
+ throw new IOException("File " +
newFile.getName() + " could not be deleted...");
}
- }
- } else {
- // no append mode, so just rename it
- if (!newFile.renameTo(new File(directory,
name))) {
- throw new IOException("File " +
newFile.getName() + " could not be renamed to " + name);
- }
- }
- }
+ }
+ } else {
+ // no append mode, so just rename it
+ if (!newFile.renameTo(new File(directory, name))) {
+ throw new IOException("File " + newFile.getName()
+ " could not be renamed to " + name);
+ }
+ }
+ }
} else {
// cleaning up incomplete files after things went wrong
if (newFile != null) {
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java?rev=777471&r1=777470&r2=777471&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java
(original)
+++
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FilePollerEndpointTest.java
Fri May 22 11:17:54 2009
@@ -18,12 +18,15 @@
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
@@ -37,9 +40,13 @@
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.executors.Executor;
+import org.apache.servicemix.jbi.container.JBIContainer;
import org.apache.servicemix.tck.mock.MockExchangeFactory;
+import org.apache.servicemix.tck.mock.MockMessageExchange;
import org.apache.servicemix.util.FileUtil;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+
public class FilePollerEndpointTest extends TestCase {
private static final File DATA = new File("target/test/data");
@@ -85,6 +92,12 @@
endpoint.setTargetService(new QName("urn:test", "service"));
endpoint.setLockManager(new
org.apache.servicemix.common.locks.impl.SimpleLockManager());
}
+
+ @Override
+ protected void tearDown() throws Exception {
+ FileUtil.deleteFile(DATA);
+ super.tearDown();
+ }
public void testValidateNoFile() throws Exception {
try {
@@ -143,9 +156,8 @@
}
public void testProcessSuccess() throws Exception {
- createTestFile();
endpoint.setFile(DATA);
- File file = new File(DATA, "test-data.xml");
+ File file = createTestFile();
endpoint.pollFile(file);
MessageExchange exchange = exchanges.get(0);
exchange.setStatus(ExchangeStatus.DONE);
@@ -153,13 +165,19 @@
assertFalse(file.exists());
}
- private void createTestFile() throws IOException {
+ private File createTestFile() throws IOException {
+ return createTestFile("test-data.xml");
+ }
+
+ private File createTestFile(String name) throws FileNotFoundException,
IOException {
DATA.mkdirs();
+ File testfile = new File(DATA, name);
InputStream fis = new
FileInputStream("target/test-classes/test-data.xml");
- OutputStream fos = new FileOutputStream(new File(DATA,
"test-data.xml"));
+ OutputStream fos = new FileOutputStream(testfile);
FileUtil.copyInputStream(fis, fos);
fis.close();
fos.close();
+ return testfile;
}
public void testMoveFileToNonExistentDirectory() throws Exception {
@@ -173,6 +191,77 @@
srcFile.delete();
}
}
+
+ /*
+ * Test file poller endpoint throttling
+ */
+ public void testPollerThrottling() throws Exception {
+ File file = createTestFile();
+ File anotherfile = createTestFile("another-test-file.xml");
+ endpoint.setFile(DATA);
+ endpoint.setMaxConcurrent(1);
+
+ final CountDownLatch polls = new CountDownLatch(1);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ endpoint.poll();
+ polls.countDown();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ });
+
+ while (exchanges.size() < 1) {
+ Thread.sleep(150);
+ }
+ assertEquals(1, exchanges.size());
+
+ // let's wait a bit to allow throttling to kick in
+ waitForThrottling();
+ assertTrue(endpoint.isThrottled());
+ assertEquals("Background polling should still be in progress", 1,
polls.getCount());
+
+ // polling again now should not block another thread
+ endpoint.poll();
+
+ // now, let's release things and make sure both files get handled
+ while (exchanges.size() > 0) {
+ MessageExchange exchange = exchanges.remove(0);
+ exchange.setStatus(ExchangeStatus.DONE);
+ endpoint.process(exchange);
+ polls.await();
+ }
+
+ assertFalse(file.exists());
+ assertFalse(anotherfile.exists());
+ }
+
+ public void testHandleUnknownExchange() throws Exception {
+ try {
+ endpoint.process(new MockMessageExchange() {
+ @Override
+ public String getExchangeId() {
+ return "a-completely-bogus-exchange";
+ }
+ });
+ } catch (Exception e) {
+ fail("The endpoint should not throw exceptions for unknown
exchanges: " + e.getMessage());
+ }
+ }
+
+ /*
+ * Let's wait for max. 750ms for the endpoint to get throttling
+ */
+ private void waitForThrottling() throws InterruptedException {
+ int count = 5;
+ while (!endpoint.isThrottled() && count > 0) {
+ Thread.sleep(150);
+ count--;
+ }
+ }
@SuppressWarnings("serial")
private static class TestException extends Exception {
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java?rev=777471&r1=777470&r2=777471&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java
(original)
+++
servicemix/components/bindings/servicemix-file/trunk/src/test/java/org/apache/servicemix/file/FileSenderEndpointTest.java
Fri May 22 11:17:54 2009
@@ -21,11 +21,13 @@
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
import junit.framework.TestCase;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
import org.apache.servicemix.jbi.framework.ComponentNameSpace;
import org.apache.servicemix.jbi.jaxp.StringSource;
import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
@@ -34,8 +36,8 @@
public class FileSenderEndpointTest extends TestCase {
- private static final File OUT_DIR = new File("target/file-test");
- private static final File OUT_FILE = new File(OUT_DIR,
"file-exists.tmp");
+ private static final File OUT_DIR = new File("target/file-test");
+ private static final File OUT_FILE = new File(OUT_DIR, "file-exists.tmp");
public static final String FILE_NAME_PROPERTY =
"org.apache.servicemix.file.name";
public static final String FILE_PATH_PROPERTY =
"org.apache.servicemix.file.path";
private FileSenderEndpoint endpoint;
@@ -96,10 +98,10 @@
endpoint.setAppend(true);
endpoint.setOverwrite(true);
try {
- endpoint.validate();
- fail("validate() should fail when isAppend and isOverwrite are
both true.");
+ endpoint.validate();
+ fail("validate() should fail when isAppend and isOverwrite are both
true.");
} catch (DeploymentException de) {
- // test succeeds
+ // test succeeds
}
}
@@ -219,6 +221,43 @@
FileUtil.deleteFile(OUT_FILE);
}
+ // Test when output file exists and append is true.
+ public final void testProcessInOnlyFileExistsAppendWithTempFileName()
throws Exception {
+ MockExchangeFactory mef = new MockExchangeFactory();
+ MessageExchange me = mef.createInOnlyExchange();
+ me.setOperation(new QName("uri", "op"));
+ me.setProperty("myProp", "myValue");
+ NormalizedMessage msg = me.createMessage();
+ msg.setProperty(FILE_PATH_PROPERTY, OUT_FILE.getAbsolutePath());
+ msg.setProperty(FILE_NAME_PROPERTY, OUT_FILE.getName());
+ msg.setContent(new StringSource("<input>input message</input>"));
+ endpoint.setDirectory(OUT_DIR);
+ endpoint.setAutoCreateDirectory(true);
+ endpoint.validate();
+ endpoint.setMarshaler(new DefaultFileMarshaler() {
+ @Override
+ public String getTempOutputName(MessageExchange exchange,
NormalizedMessage message) throws MessagingException {
+ return super.getOutputName(exchange, message) + ".tmp";
+ }
+ });
+
+ // Create the initial file for later use.
+ endpoint.processInOnly(me, msg);
+
+ long fileLength = OUT_FILE.length();
+
+ endpoint.setOverwrite(false);
+ endpoint.setAppend(true);
+
+ endpoint.processInOnly(me, msg);
+
+ assertTrue("File was not overwritten: " + OUT_FILE.getAbsolutePath(),
OUT_FILE.length() > fileLength);
+ assertFalse("Temporary file no longer exists", new
File(OUT_FILE.getAbsolutePath() + ".tmp").exists());
+
+ // clean up
+ FileUtil.deleteFile(OUT_FILE);
+ }
+
// Test when calling processInOut - not supported.
public final void testProcessInOutNotSupported() throws Exception {
MockExchangeFactory mef = new MockExchangeFactory();