Author: davsclaus
Date: Mon Jan 10 09:03:05 2011
New Revision: 1057133
URL: http://svn.apache.org/viewvc?rev=1057133&view=rev
Log:
CAMEL-3497: Fixed timeout issue when using parallel on multicast/splitter etc.
to fetch any already completed tasks to aggregate during timeout.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Mon Jan 10 09:03:05 2011
@@ -61,6 +61,7 @@ import org.apache.camel.util.ServiceHelp
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.concurrent.AtomicException;
import org.apache.camel.util.concurrent.AtomicExchange;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -115,12 +116,10 @@ public class MulticastProcessor extends
public void begin() {
// noop
- LOG.trace("ProcessorExchangePair #" + index + " begin: " +
exchange);
}
public void done() {
// noop
- LOG.trace("ProcessorExchangePair #" + index + " done: " +
exchange);
}
}
@@ -412,6 +411,9 @@ public class MulticastProcessor extends
// we are timed out but try to grab if some tasks has been
completed
// poll will return null if no tasks is present
future = completion.poll();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polled completion task #" + aggregated + "
after timeout to grab already completed tasks: " + future);
+ }
} else if (timeout > 0) {
long left = timeout - watch.taken();
if (left < 0) {
@@ -451,7 +453,14 @@ public class MulticastProcessor extends
// log a WARN we timed out since it will not be
aggregated and the Exchange will be lost
LOG.warn("Parallel processing timed out after " +
timeout + " millis for number " + aggregated + ". This task will be cancelled
and will not be aggregated.");
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Timeout occurred after " + timeout + "
millis for number " + aggregated + " task.");
+ }
timedOut = true;
+
+ // mark that index as timed out, which allows us to try to
retrieve
+ // any already completed tasks in the next loop
+ ExecutorServiceHelper.timeoutTask(completion);
} else {
// there is a result to aggregate
Exchange subExchange = future.get();
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Mon Jan 10 09:03:05 2011
@@ -17,6 +17,7 @@
package org.apache.camel.util.concurrent;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -347,4 +348,18 @@ public final class ExecutorServiceHelper
return null;
}
+ /**
+ * Timeout the completion service.
+ * <p/>
+ * This can be used to mark the completion service as timed out, allowing
you to poll any already completed tasks.
+ * This applies when using the {...@link SubmitOrderedCompletionService}.
+ *
+ * @param completionService the completion service.
+ */
+ public static void timeoutTask(CompletionService completionService) {
+ if (completionService instanceof SubmitOrderedCompletionService) {
+ ((SubmitOrderedCompletionService) completionService).timeoutTask();
+ }
+ }
+
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
Mon Jan 10 09:03:05 2011
@@ -16,6 +16,8 @@
*/
package org.apache.camel.util.concurrent;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.DelayQueue;
@@ -65,7 +67,8 @@ public class SubmitOrderedCompletionServ
// if the answer is 0 then this task is ready to be taken
return id - index.get();
}
-
+
+ @SuppressWarnings("unchecked")
public int compareTo(Delayed o) {
SubmitOrderFutureTask other = (SubmitOrderFutureTask) o;
return (int) (this.id - other.id);
@@ -76,6 +79,12 @@ public class SubmitOrderedCompletionServ
// when we are done add to the completion queue
completionQueue.add(this);
}
+
+ @Override
+ public String toString() {
+ // output using zero-based index
+ return "SubmitOrderedFutureTask[" + (id - 1) + "]";
+ }
}
public SubmitOrderedCompletionService(Executor executor) {
@@ -89,7 +98,7 @@ public class SubmitOrderedCompletionServ
}
SubmitOrderFutureTask f = new
SubmitOrderFutureTask(id.incrementAndGet(), task);
executor.execute(f);
- return (Future<V>) f;
+ return f;
}
public Future<V> submit(Runnable task, Object result) {
@@ -104,18 +113,37 @@ public class SubmitOrderedCompletionServ
@SuppressWarnings("unchecked")
public Future<V> take() throws InterruptedException {
index.incrementAndGet();
- return (Future) completionQueue.take();
+ return completionQueue.take();
}
@SuppressWarnings("unchecked")
public Future<V> poll() {
index.incrementAndGet();
- return (Future) completionQueue.poll();
+ Future answer = completionQueue.poll();
+ if (answer == null) {
+ // decrease counter if we didnt get any data
+ index.decrementAndGet();
+ }
+ return answer;
}
@SuppressWarnings("unchecked")
public Future<V> poll(long timeout, TimeUnit unit) throws
InterruptedException {
index.incrementAndGet();
- return (Future) completionQueue.poll(timeout, unit);
+ Future answer = completionQueue.poll(timeout, unit);
+ if (answer == null) {
+ // decrease counter if we didnt get any data
+ index.decrementAndGet();
+ }
+ return answer;
}
+
+ /**
+ * Marks the current task as timeout, which allows you to poll the next
+ * tasks which may already have been completed.
+ */
+ public void timeoutTask() {
+ index.incrementAndGet();
+ }
+
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java
Mon Jan 10 09:03:05 2011
@@ -57,7 +57,7 @@ public class AsyncEndpointRecipientListP
beforeThreadName =
Thread.currentThread().getName();
}
})
- .recipientList(constant("async:Hi
Camel,direct:foo")).parallelProcessing();
+ .recipientList(constant("async:Hi
Camel?delay=2000,direct:foo")).parallelProcessing();
from("direct:foo")
.process(new Processor() {
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java
Mon Jan 10 09:03:05 2011
@@ -19,6 +19,7 @@ package org.apache.camel.util.concurrent
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
@@ -85,6 +86,63 @@ public class SubmitOrderedCompletionServ
assertEquals("B", b);
}
+ public void testSubmitOrderedFirstTaskIsSlowUsingPollTimeout() throws
Exception {
+
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ // this task should be slower than B but we should still get
it first
+ Thread.sleep(200);
+ return "A";
+ }
+ });
+
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ return "B";
+ }
+ });
+
+ Object a = service.poll(5, TimeUnit.SECONDS).get();
+ Object b = service.poll(5, TimeUnit.SECONDS).get();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ public void testSubmitOrderedFirstTaskIsSlowUsingPoll() throws Exception {
+
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ // this task should be slower than B but we should still get
it first
+ Thread.sleep(1000);
+ return "A";
+ }
+ });
+
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ return "B";
+ }
+ });
+
+ // poll should not get it the first time
+ Object a = service.poll();
+ assertNull(a);
+
+ Thread.sleep(100);
+
+ // and neither the 2nd time
+ a = service.poll();
+ assertNull(a);
+
+ // okay take them
+ a = service.take().get();
+ Object b = service.take().get();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
public void testSubmitOrderedSecondTaskIsSlow() throws Exception {
service.submit(new Callable<Object>() {
@@ -106,4 +164,63 @@ public class SubmitOrderedCompletionServ
assertEquals("A", a);
assertEquals("B", b);
}
+
+ public void testSubmitOrderedSecondTaskIsSlowUsingPollTimeout() throws
Exception {
+
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ return "A";
+ }
+ });
+
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ Thread.sleep(200);
+ return "B";
+ }
+ });
+
+ Object a = service.poll(5, TimeUnit.SECONDS).get();
+ Object b = service.poll(5, TimeUnit.SECONDS).get();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
+ public void testSubmitOrderedLastTaskIsSlowUsingPoll() throws Exception {
+
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ return "A";
+ }
+ });
+
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ Thread.sleep(1000);
+ return "B";
+ }
+ });
+
+ // take a
+ Object a = service.take().get();
+ assertNotNull(a);
+
+ // poll should not get it the first time
+ Object b = service.poll();
+ assertNull(b);
+
+ Thread.sleep(100);
+
+ // and neither the 2nd time
+ b = service.poll();
+ assertNull(b);
+
+ // okay take it
+ b = service.take().get();
+
+ assertEquals("A", a);
+ assertEquals("B", b);
+ }
+
}