Author: davsclaus
Date: Mon Jul 14 02:21:31 2008
New Revision: 676525

URL: http://svn.apache.org/viewvc?rev=676525&view=rev
Log:
CAMEL-702: A proposed fix for CAMEL-702. Gert will take a look too. Concurrency 
issue with camel-saon.

Added:
    
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
Modified:
    
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java

Modified: 
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java?rev=676525&r1=676524&r2=676525&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
 (original)
+++ 
activemq/camel/trunk/components/camel-saxon/src/main/java/org/apache/camel/component/xquery/XQueryBuilder.java
 Mon Jul 14 02:21:31 2008
@@ -30,7 +30,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.xml.transform.Result;
 import javax.xml.transform.Source;
 import javax.xml.transform.dom.DOMResult;
@@ -38,15 +39,6 @@
 
 import org.w3c.dom.Node;
 
-import net.sf.saxon.Configuration;
-import net.sf.saxon.om.DocumentInfo;
-import net.sf.saxon.om.Item;
-import net.sf.saxon.om.SequenceIterator;
-import net.sf.saxon.query.DynamicQueryContext;
-import net.sf.saxon.query.StaticQueryContext;
-import net.sf.saxon.query.XQueryExpression;
-import net.sf.saxon.trans.StaticError;
-import net.sf.saxon.trans.XPathException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Message;
@@ -62,6 +54,15 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import net.sf.saxon.Configuration;
+import net.sf.saxon.om.DocumentInfo;
+import net.sf.saxon.om.Item;
+import net.sf.saxon.om.SequenceIterator;
+import net.sf.saxon.query.DynamicQueryContext;
+import net.sf.saxon.query.StaticQueryContext;
+import net.sf.saxon.query.XQueryExpression;
+import net.sf.saxon.trans.XPathException;
+
 
 /**
  * Creates an XQuery builder
@@ -79,6 +80,8 @@
     private ResultFormat resultsFormat = ResultFormat.DOM;
     private Properties properties = new Properties();
     private Class resultType;
+    private final Semaphore lock = new Semaphore(1);
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
 
     @Override
     public String toString() {
@@ -95,6 +98,19 @@
 
     public Object evaluate(Exchange exchange) {
         try {
+            // handle concurrency issue when initializing, allow only one to 
initialize
+            if (!initialized.get()) {
+                try {
+                    lock.acquire();
+                    if (!initialized.get()) {
+                        initialize();
+                        initialized.set(true);
+                    }
+                } finally {
+                    lock.release();
+                }
+            }
+
             if (resultType != null) {
                 if (resultType.equals(String.class)) {
                     return evaluateAsString(exchange);
@@ -425,11 +441,17 @@
      * been created lets nullify references here
      */
     protected void clearBuilderReferences() {
-        staticQueryContext = null;
-        configuration = null;
+        // TODO: These causes problems if we null them in concurrency 
environments
+        //staticQueryContext = null;
+        //configuration = null;
     }
 
     protected boolean matches(Exchange exchange, List results) {
         return ObjectHelper.matches(results);
     }
+
+    protected void initialize() throws XPathException, IOException {
+        getExpression();
+    }
+
 }

Added: 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java?rev=676525&view=auto
==============================================================================
--- 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
 (added)
+++ 
activemq/camel/trunk/components/camel-saxon/src/test/java/org/apache/camel/component/xquery/XQueryConcurrencyTest.java
 Mon Jul 14 02:21:31 2008
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.xquery;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.DeadLetterChannelBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * Concurrency test of XQuery.
+ */
+public class XQueryConcurrencyTest extends ContextTestSupport {
+
+    public void testConcurrency() throws Exception {
+        int total = 100;
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(total);
+
+        // setup a task executor to be able send the messages in parallel
+        final CountDownLatch latch = new CountDownLatch(5);
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(5);
+        executor.afterPropertiesSet();
+        for (int i = 0; i < 5; i++) {
+            final int threadCount = i;
+            executor.execute(new Runnable() {
+                public void run() {
+                    // requestbody is InOut pattern and thus we expect a reply 
(JMSReply)
+                    int start = threadCount * 20;
+                    for (int i = 0; i < 20; i++) {
+                        Object response = template.sendBody("seda:in",
+                            "<person id='" + (start + i) + "'>James</person>");
+                    }
+                    latch.countDown();
+                }
+            });
+        }
+        // wait for test completion, timeout after 10 sec to let other unit 
test run to not wait forever
+        latch.await(10000L, TimeUnit.MILLISECONDS);
+        assertEquals("Latch should be zero", 0, latch.getCount());
+
+        mock.assertIsSatisfied();
+        mock.assertNoDuplicates(body());
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // only retry at max 2 times to cather
+                // if set to 0 we can get interal Saxon errors - SENR0001
+                errorHandler(new 
DeadLetterChannelBuilder().maximumRedeliveries(2));
+
+                from("seda:in")
+                    .thread(10)
+                    .transform().xquery("/person/@id", String.class)
+                    .to("mock:result");
+            }
+        };
+    }
+}


Reply via email to