Author: cmueller
Date: Wed Apr 11 20:55:43 2012
New Revision: 1324997
URL: http://svn.apache.org/viewvc?rev=1324997&view=rev
Log:
CAMEL-3776: Add pooling support for JAXB data format
Modified:
camel/branches/camel-2.8.x/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
Modified:
camel/branches/camel-2.8.x/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java?rev=1324997&r1=1324996&r2=1324997&view=diff
==============================================================================
---
camel/branches/camel-2.8.x/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
(original)
+++
camel/branches/camel-2.8.x/components/camel-jaxb/src/main/java/org/apache/camel/converter/jaxb/JaxbDataFormat.java
Wed Apr 11 20:55:43 2012
@@ -21,6 +21,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.util.concurrent.locks.ReentrantLock;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
@@ -67,6 +68,9 @@ public class JaxbDataFormat extends Serv
private String partClass;
private Class partialClass;
+ private Unmarshaller unmarshaller;
+ private ReentrantLock lock = new ReentrantLock();
+
public JaxbDataFormat() {
}
@@ -132,26 +136,29 @@ public class JaxbDataFormat extends Serv
@SuppressWarnings("unchecked")
public Object unmarshal(Exchange exchange, InputStream stream) throws
IOException {
try {
- // must create a new instance of unmarshaller as its not thread
safe
Object answer;
- Unmarshaller unmarshaller = getContext().createUnmarshaller();
- if (partialClass != null) {
- // partial unmarshalling
- Source source;
- if (needFiltering(exchange)) {
- source = new
StreamSource(createNonXmlFilterReader(exchange, stream));
+ lock.lock();
+ try {
+ if (partialClass != null) {
+ // partial unmarshalling
+ Source source;
+ if (needFiltering(exchange)) {
+ source = new
StreamSource(createNonXmlFilterReader(exchange, stream));
+ } else {
+ source = new StreamSource(stream);
+ }
+ answer = unmarshaller.unmarshal(source, partialClass);
} else {
- source = new StreamSource(stream);
- }
- answer = unmarshaller.unmarshal(source, partialClass);
- } else {
- if (needFiltering(exchange)) {
- NonXmlFilterReader reader =
createNonXmlFilterReader(exchange, stream);
- answer = unmarshaller.unmarshal(reader);
- } else {
- answer = unmarshaller.unmarshal(stream);
+ if (needFiltering(exchange)) {
+ NonXmlFilterReader reader =
createNonXmlFilterReader(exchange, stream);
+ answer = unmarshaller.unmarshal(reader);
+ } else {
+ answer = unmarshaller.unmarshal(stream);
+ }
}
+ } finally {
+ lock.unlock();
}
if (answer instanceof JAXBElement && isIgnoreJAXBElement()) {
@@ -265,6 +272,7 @@ public class JaxbDataFormat extends Serv
if (partClass != null) {
partialClass =
camelContext.getClassResolver().resolveMandatoryClass(partClass);
}
+ unmarshaller = getContext().createUnmarshaller();
}
@Override
Modified:
camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java?rev=1324997&r1=1324996&r2=1324997&view=diff
==============================================================================
---
camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
(original)
+++
camel/branches/camel-2.8.x/components/camel-jaxb/src/test/java/org/apache/camel/example/DataFormatConcurrentTest.java
Wed Apr 11 20:55:43 2012
@@ -16,8 +16,10 @@
*/
package org.apache.camel.example;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -27,13 +29,37 @@ import org.apache.camel.test.junit4.Came
import org.junit.Test;
/**
- * @version
+ * @version
*/
public class DataFormatConcurrentTest extends CamelTestSupport {
private int size = 2000;
@Test
+ public void testUnmarshallConcurrent() throws Exception {
+ int counter = 10000;
+ final String payload = "<purchaseOrder name='Wine' amount='123.45'
price='2.22'/>";
+ final CountDownLatch latch = new CountDownLatch(counter);
+ template.setDefaultEndpointUri("direct:unmarshal");
+
+ ExecutorService pool = Executors.newFixedThreadPool(20);
+ //long start = System.currentTimeMillis();
+ for (int i = 0; i < counter; i++) {
+ pool.execute(new Runnable() {
+ public void run() {
+ template.sendBody(payload);
+ latch.countDown();
+ }
+ });
+ }
+
+ // should finish on fast machines in less than 3 seconds
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ //long end = System.currentTimeMillis();
+ //System.out.println("took " + (end - start) + "ms");
+ }
+
+ @Test
public void testSendConcurrent() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(size);
@@ -73,6 +99,10 @@ public class DataFormatConcurrentTest ex
// use seda that supports concurrent consumers for concurrency
from("seda:start?size=" + size +
"&concurrentConsumers=5").marshal(jaxb).convertBodyTo(String.class).to("mock:result");
+
+ from("direct:unmarshal")
+ .unmarshal(jaxb)
+ .to("mock:result");
}
};
}