Author: cschneider
Date: Mon Sep 5 13:54:44 2011
New Revision: 1165283
URL: http://svn.apache.org/viewvc?rev=1165283&view=rev
Log:
CAMEL-4417 Move AsnycProcessorTypeConverter.convert method to processor while
keeping the old one for compatibility
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
Mon Sep 5 13:54:44 2011
@@ -21,7 +21,7 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class DirectProducer extends Defa
callback.done(true);
return true;
} else {
- AsyncProcessor processor =
AsyncProcessorTypeConverter.convert(endpoint.getConsumer().getProcessor());
+ AsyncProcessor processor =
AsyncProcessorConverterHelper.convert(endpoint.getConsumer().getProcessor());
return AsyncProcessorHelper.process(processor, exchange, callback);
}
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Mon Sep 5 13:54:44 2011
@@ -31,7 +31,7 @@ import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.SuspendableService;
import org.apache.camel.impl.LoggingExceptionHandler;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.ShutdownAware;
@@ -61,7 +61,7 @@ public class SedaConsumer extends Servic
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
- this.processor = AsyncProcessorTypeConverter.convert(processor);
+ this.processor = AsyncProcessorConverterHelper.convert(processor);
}
@Override
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
Mon Sep 5 13:54:44 2011
@@ -20,7 +20,7 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
@@ -64,7 +64,7 @@ public class DefaultConsumer extends Ser
*/
public synchronized AsyncProcessor getAsyncProcessor() {
if (asyncProcessor == null) {
- asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+ asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
}
return asyncProcessor;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
Mon Sep 5 13:54:44 2011
@@ -30,7 +30,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerCallback;
import org.apache.camel.ServicePoolAware;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.processor.UnitOfWorkProducer;
import org.apache.camel.spi.ServicePool;
import org.apache.camel.support.ServiceSupport;
@@ -281,7 +281,7 @@ public class ProducerCache extends Servi
try {
// invoke the callback
- AsyncProcessor asyncProcessor =
AsyncProcessorTypeConverter.convert(producer);
+ AsyncProcessor asyncProcessor =
AsyncProcessorConverterHelper.convert(producer);
sync = producerCallback.doInAsyncProducer(producer,
asyncProcessor, exchange, pattern, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
Mon Sep 5 13:54:44 2011
@@ -16,13 +16,12 @@
*/
package org.apache.camel.impl.converter;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
-import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
/**
* A simple converter that can convert any {@link Processor} to an {@link
AsyncProcessor}.
@@ -33,49 +32,11 @@ import org.apache.camel.processor.Delega
*/
public class AsyncProcessorTypeConverter implements TypeConverter {
- private static final class ProcessorToAsyncProcessorBridge extends
DelegateProcessor implements AsyncProcessor {
-
- private ProcessorToAsyncProcessorBridge(Processor processor) {
- super(processor);
- }
-
- public boolean process(Exchange exchange, AsyncCallback callback) {
- if (processor == null) {
- // no processor then we are done
- callback.done(true);
- return true;
- }
- try {
- processor.process(exchange);
- } catch (Throwable e) {
- // must catch throwable so we catch all
- exchange.setException(e);
- } finally {
- // we are bridging a sync processor as async so callback with
true
- callback.done(true);
- }
- return true;
- }
-
- @Override
- public String toString() {
- if (processor != null) {
- return processor.toString();
- } else {
- return "Processor is null";
- }
- }
- }
-
public <T> T convertTo(Class<T> type, Object value) {
if (value != null) {
if (type.equals(AsyncProcessor.class)) {
- if (value instanceof AsyncProcessor) {
- return type.cast(value);
- } else if (value instanceof Processor) {
- // Provide an async bridge to the regular processor.
- final Processor processor = (Processor)value;
- return type.cast(new
ProcessorToAsyncProcessorBridge(processor));
+ if (value instanceof Processor) {
+ return
type.cast(AsyncProcessorConverterHelper.convert((Processor)value));
}
}
}
@@ -93,11 +54,14 @@ public class AsyncProcessorTypeConverter
public <T> T mandatoryConvertTo(Class<T> type, Exchange exchange, Object
value) throws NoTypeConversionAvailableException {
return convertTo(type, exchange, value);
}
-
+
+ /**
+ * @deprecated use AnycProcessorConverter.convert instead
+ * @param value
+ * @return
+ */
+ @Deprecated
public static AsyncProcessor convert(Processor value) {
- if (value instanceof AsyncProcessor) {
- return (AsyncProcessor)value;
- }
- return new ProcessorToAsyncProcessorBridge(value);
+ return AsyncProcessorConverterHelper.convert(value);
}
}
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java?rev=1165283&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java
Mon Sep 5 13:54:44 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * A simple converter that can convert any {@link Processor} to an {@link
AsyncProcessor}.
+ * Processing will still occur synchronously but it will provide the required
+ * notifications that the caller expects.
+ *
+ * @version
+ */
+public final class AsyncProcessorConverterHelper {
+
+ private AsyncProcessorConverterHelper() {
+ // Helper class
+ }
+
+ private static final class ProcessorToAsyncProcessorBridge extends
DelegateProcessor implements AsyncProcessor {
+
+ private ProcessorToAsyncProcessorBridge(Processor processor) {
+ super(processor);
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (processor == null) {
+ // no processor then we are done
+ callback.done(true);
+ return true;
+ }
+ try {
+ processor.process(exchange);
+ } catch (Throwable e) {
+ // must catch throwable so we catch all
+ exchange.setException(e);
+ } finally {
+ // we are bridging a sync processor as async so callback with
true
+ callback.done(true);
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ if (processor != null) {
+ return processor.toString();
+ } else {
+ return "Processor is null";
+ }
+ }
+ }
+
+ public static AsyncProcessor convert(Processor value) {
+ if (value instanceof AsyncProcessor) {
+ return (AsyncProcessor)value;
+ }
+ return new ProcessorToAsyncProcessorBridge(value);
+ }
+}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
Mon Sep 5 13:54:44 2011
@@ -25,7 +25,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ServiceHelper;
@@ -46,7 +45,7 @@ public class ChoiceProcessor extends Ser
public ChoiceProcessor(List<FilterProcessor> filters, Processor otherwise)
{
this.filters = filters;
- this.otherwise = AsyncProcessorTypeConverter.convert(otherwise);
+ this.otherwise = AsyncProcessorConverterHelper.convert(otherwise);
}
public void process(Exchange exchange) throws Exception {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
Mon Sep 5 13:54:44 2011
@@ -29,7 +29,6 @@ import org.apache.camel.Channel;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Service;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.interceptor.StreamCaching;
import org.apache.camel.processor.interceptor.TraceFormatter;
@@ -301,7 +300,7 @@ public class DefaultChannel extends Serv
exchange.getUnitOfWork().pushRouteContext(routeContext);
}
- AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor);
+ AsyncProcessor async =
AsyncProcessorConverterHelper.convert(processor);
boolean sync = async.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// pop the route context we just used
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
Mon Sep 5 13:54:44 2011
@@ -24,7 +24,6 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ServiceHelper;
@@ -53,7 +52,7 @@ public class DelegateAsyncProcessor exte
}
public DelegateAsyncProcessor(Processor processor) {
- this(AsyncProcessorTypeConverter.convert(processor));
+ this(AsyncProcessorConverterHelper.convert(processor));
}
@Override
@@ -70,7 +69,7 @@ public class DelegateAsyncProcessor exte
}
public void setProcessor(Processor processor) {
- this.processor = AsyncProcessorTypeConverter.convert(processor);
+ this.processor = AsyncProcessorConverterHelper.convert(processor);
}
protected void doStart() throws Exception {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Mon Sep 5 13:54:44 2011
@@ -22,7 +22,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -109,7 +108,7 @@ public class Enricher extends ServiceSup
public boolean process(final Exchange exchange, final AsyncCallback
callback) {
final Exchange resourceExchange = createResourceExchange(exchange,
ExchangePattern.InOut);
- AsyncProcessor ap = AsyncProcessorTypeConverter.convert(producer);
+ AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new
AsyncCallback() {
public void done(boolean doneSync) {
// we only have to handle async completion of the routing slip
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
Mon Sep 5 13:54:44 2011
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
@@ -53,7 +52,7 @@ public class InterceptorToAsyncProcessor
* @param target the target
*/
public InterceptorToAsyncProcessorBridge(Processor interceptor,
AsyncProcessor target) {
- this.interceptor = AsyncProcessorTypeConverter.convert(interceptor);
+ this.interceptor = AsyncProcessorConverterHelper.convert(interceptor);
this.target = target;
}
@@ -89,7 +88,7 @@ public class InterceptorToAsyncProcessor
}
public void setTarget(Processor target) {
- this.target = AsyncProcessorTypeConverter.convert(target);
+ this.target = AsyncProcessorConverterHelper.convert(target);
}
@Override
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Mon Sep 5 13:54:44 2011
@@ -45,7 +45,6 @@ import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
import org.apache.camel.spi.RouteContext;
@@ -569,7 +568,7 @@ public class MulticastProcessor extends
}
// let the prepared process it, remember to begin the exchange pair
- AsyncProcessor async =
AsyncProcessorTypeConverter.convert(processor);
+ AsyncProcessor async =
AsyncProcessorConverterHelper.convert(processor);
pair.begin();
sync = AsyncProcessorHelper.process(async, exchange, new
AsyncCallback() {
public void done(boolean doneSync) {
@@ -701,7 +700,7 @@ public class MulticastProcessor extends
// let the prepared process it, remember to begin the exchange pair
// we invoke it synchronously as parallel async routing is too hard
- AsyncProcessor async =
AsyncProcessorTypeConverter.convert(processor);
+ AsyncProcessor async =
AsyncProcessorConverterHelper.convert(processor);
pair.begin();
AsyncProcessorHelper.process(async, exchange);
} finally {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
Mon Sep 5 13:54:44 2011
@@ -25,7 +25,6 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.slf4j.Logger;
@@ -75,7 +74,7 @@ public class Pipeline extends MulticastP
// get the next processor
Processor processor = processors.next();
- AsyncProcessor async =
AsyncProcessorTypeConverter.convert(processor);
+ AsyncProcessor async =
AsyncProcessorConverterHelper.convert(processor);
boolean sync = process(exchange, nextExchange, callback,
processors, async);
// continue as long its being processed synchronously
@@ -123,7 +122,7 @@ public class Pipeline extends MulticastP
// continue processing the pipeline asynchronously
Exchange nextExchange = exchange;
while (continueRouting(processors, nextExchange)) {
- AsyncProcessor processor =
AsyncProcessorTypeConverter.convert(processors.next());
+ AsyncProcessor processor =
AsyncProcessorConverterHelper.convert(processors.next());
// check for error if so we should break out
if (!continueProcessing(nextExchange, "so breaking out of
pipeline", LOG)) {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
Mon Sep 5 13:54:44 2011
@@ -29,7 +29,6 @@ import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.spi.SubUnitOfWorkCallback;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -185,7 +184,7 @@ public abstract class RedeliveryErrorHan
this.redeliveryProcessor = redeliveryProcessor;
this.deadLetter = deadLetter;
this.output = output;
- this.outputAsync = AsyncProcessorTypeConverter.convert(output);
+ this.outputAsync = AsyncProcessorConverterHelper.convert(output);
this.redeliveryPolicy = redeliveryPolicy;
this.logger = logger;
this.deadLetterUri = deadLetterUri;
@@ -695,7 +694,7 @@ public abstract class RedeliveryErrorHan
exchange.setProperty(Exchange.FAILURE_ENDPOINT,
exchange.getProperty(Exchange.TO_ENDPOINT));
// the failure processor could also be asynchronous
- AsyncProcessor afp =
AsyncProcessorTypeConverter.convert(processor);
+ AsyncProcessor afp =
AsyncProcessorConverterHelper.convert(processor);
sync = AsyncProcessorHelper.process(afp, exchange, new
AsyncCallback() {
public void done(boolean sync) {
log.trace("Failure processor done: {} processing Exchange:
{}", processor, exchange);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
Mon Sep 5 13:54:44 2011
@@ -26,7 +26,6 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
@@ -48,7 +47,7 @@ public class TryProcessor extends Servic
private List<AsyncProcessor> processors;
public TryProcessor(Processor tryProcessor, List<CatchProcessor>
catchClauses, Processor finallyProcessor) {
- this.tryProcessor = AsyncProcessorTypeConverter.convert(tryProcessor);
+ this.tryProcessor =
AsyncProcessorConverterHelper.convert(tryProcessor);
this.catchProcessor = new DoCatchProcessor(catchClauses);
this.finallyProcessor = new DoFinallyProcessor(finallyProcessor);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
Mon Sep 5 13:54:44 2011
@@ -25,7 +25,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -53,7 +53,7 @@ public class IdempotentConsumer extends
this.idempotentRepository = idempotentRepository;
this.eager = eager;
this.skipDuplicate = skipDuplicate;
- this.processor = AsyncProcessorTypeConverter.convert(processor);
+ this.processor = AsyncProcessorConverterHelper.convert(processor);
}
@Override
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
Mon Sep 5 13:54:44 2011
@@ -23,7 +23,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.processor.Traceable;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ObjectHelper;
@@ -208,7 +208,7 @@ public class FailOverLoadBalancer extend
}
log.debug("Processing failover at attempt {} for {}", attempts,
exchange);
- AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
+ AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
return AsyncProcessorHelper.process(albp, exchange, new
FailOverAsyncCallback(exchange, attempts, index, callback, processors));
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
Mon Sep 5 13:54:44 2011
@@ -22,7 +22,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
/**
@@ -40,7 +40,7 @@ public abstract class QueueLoadBalancer
if (processor == null) {
throw new IllegalStateException("No processors could be chosen
to process " + exchange);
} else {
- AsyncProcessor albp =
AsyncProcessorTypeConverter.convert(processor);
+ AsyncProcessor albp =
AsyncProcessorConverterHelper.convert(processor);
boolean sync = AsyncProcessorHelper.process(albp, exchange,
new AsyncCallback() {
public void done(boolean doneSync) {
// only handle the async case
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
Mon Sep 5 13:54:44 2011
@@ -23,8 +23,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -121,7 +121,7 @@ public class AsyncEndpointPolicyTest ext
invoked++;
// let the original processor continue routing
exchange.getIn().setHeader(name, "was wrapped");
- AsyncProcessor ap =
AsyncProcessorTypeConverter.convert(processor);
+ AsyncProcessor ap =
AsyncProcessorConverterHelper.convert(processor);
boolean sync = ap.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// we only have to handle async completion of this
policy
Modified:
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
(original)
+++
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
Mon Sep 5 13:54:44 2011
@@ -28,7 +28,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultExchangeHolder;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public class HazelcastSedaConsumer exten
public HazelcastSedaConsumer(final Endpoint endpoint, final Processor
processor) {
super(endpoint, processor);
this.endpoint = (HazelcastSedaEndpoint) endpoint;
- this.processor = AsyncProcessorTypeConverter.convert(processor);
+ this.processor = AsyncProcessorConverterHelper.convert(processor);
}
@Override
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
Mon Sep 5 13:54:44 2011
@@ -23,7 +23,7 @@ import org.apache.camel.ShutdownRunningT
import org.apache.camel.SuspendableService;
import org.apache.camel.component.routebox.RouteboxConsumer;
import org.apache.camel.component.routebox.RouteboxServiceSupport;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.spi.ShutdownAware;
public class RouteboxDirectConsumer extends RouteboxServiceSupport implements
RouteboxConsumer, ShutdownAware, SuspendableService {
@@ -78,7 +78,7 @@ public class RouteboxDirectConsumer exte
*/
public synchronized AsyncProcessor getAsyncProcessor() {
if (asyncProcessor == null) {
- asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+ asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
}
return asyncProcessor;
}
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
Mon Sep 5 13:54:44 2011
@@ -26,7 +26,7 @@ import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.routebox.RouteboxServiceSupport;
import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +70,7 @@ public class RouteboxDirectProducer exte
RouteboxDispatcher dispatcher = new
RouteboxDispatcher(producer);
exchange = dispatcher.dispatchAsync(getRouteboxEndpoint(),
exchange);
if (getRouteboxEndpoint().getConfig().isSendToConsumer()) {
- AsyncProcessor processor =
AsyncProcessorTypeConverter.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor());
+ AsyncProcessor processor =
AsyncProcessorConverterHelper.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor());
flag = AsyncProcessorHelper.process(processor, exchange,
new AsyncCallback() {
public void done(boolean doneSync) {
// we only have to handle async completion of this
policy
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
---
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
(original)
+++
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
Mon Sep 5 13:54:44 2011
@@ -29,7 +29,7 @@ import org.apache.camel.ShutdownRunningT
import org.apache.camel.component.routebox.RouteboxConsumer;
import org.apache.camel.component.routebox.RouteboxServiceSupport;
import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.util.AsyncProcessorHelper;
import org.slf4j.Logger;
@@ -42,7 +42,7 @@ public class RouteboxSedaConsumer extend
public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor
processor) {
super(endpoint);
- this.setProcessor(AsyncProcessorTypeConverter.convert(processor));
+ this.setProcessor(AsyncProcessorConverterHelper.convert(processor));
this.producer = endpoint.getConfig().getInnerProducerTemplate();
}