Author: jstrachan
Date: Mon Apr 30 09:41:50 2007
New Revision: 533794
URL: http://svn.apache.org/viewvc?view=rev&rev=533794
Log:
added some helper methods to make it easier to write properly configured
polling consumers. Also added an ExceptionHandler for asynchronous processsors
to deal with async exceptions nicer
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
(with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
Mon Apr 30 09:41:50 2007
@@ -62,6 +62,10 @@
return null;
}
if (parameters != null) {
+ if (endpoint instanceof PollingEndpoint) {
+ PollingEndpoint pollingEndpoint = (PollingEndpoint) endpoint;
+ pollingEndpoint.configureProperties(parameters);
+ }
IntrospectionSupport.setProperties(endpoint, parameters);
}
return endpoint;
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
Mon Apr 30 09:41:50 2007
@@ -21,6 +21,7 @@
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
+import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
/**
@@ -29,6 +30,7 @@
public class DefaultConsumer<E extends Exchange> extends ServiceSupport
implements Consumer<E> {
private Endpoint<E> endpoint;
private Processor<E> processor;
+ private ExceptionHandler exceptionHandler;
public DefaultConsumer(Endpoint<E> endpoint, Processor<E> processor) {
this.endpoint = endpoint;
@@ -43,11 +45,31 @@
return processor;
}
+ public ExceptionHandler getExceptionHandler() {
+ if (exceptionHandler == null) {
+ exceptionHandler = new LoggingExceptionHandler(getClass());
+ }
+ return exceptionHandler;
+ }
+
+ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
protected void doStop() throws Exception {
ServiceHelper.stopServices(processor);
}
protected void doStart() throws Exception {
ServiceHelper.startServices(processor);
+ }
+
+ /**
+ * Handles the given exception using the [EMAIL PROTECTED]
#getExceptionHandler()}
+ *
+ * @param t the exception to handle
+ */
+ protected void handleException(Throwable t) {
+ getExceptionHandler().handleException(t);
}
}
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java?view=auto&rev=533794
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
Mon Apr 30 09:41:50 2007
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.processor.Logger;
+import org.apache.camel.processor.LoggingLevel;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A default implementation of [EMAIL PROTECTED] ExceptionHandler} which uses
a [EMAIL PROTECTED] Logger} to
+ * log to an arbitrary [EMAIL PROTECTED] Log} with some [EMAIL PROTECTED]
LoggingLevel}
+ *
+ * @version $Revision: 1.1 $
+ */
+public class LoggingExceptionHandler implements ExceptionHandler {
+ private final Logger logger;
+
+ public LoggingExceptionHandler(Class ownerType) {
+ this(new Logger(LogFactory.getLog(ownerType), LoggingLevel.ERROR));
+ }
+
+ public LoggingExceptionHandler(Logger logger) {
+ this.logger = logger;
+ }
+
+ public void handleException(Throwable exception) {
+ logger.log(exception.getMessage(), exception);
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java?view=auto&rev=533794
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
Mon Apr 30 09:41:50 2007
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.util.IntrospectionSupport;
+
+import java.util.Map;
+
+/**
+ * A base class for [EMAIL PROTECTED] Endpoint} which creates a [EMAIL
PROTECTED] PollingConsumer}
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class PollingEndpoint<E extends Exchange> extends
DefaultEndpoint<E> {
+ private Map consumerProperties;
+
+ protected PollingEndpoint(String endpointUri, Component component) {
+ super(endpointUri, component);
+ }
+
+ public Map getConsumerProperties() {
+ return consumerProperties;
+ }
+
+ public void setConsumerProperties(Map consumerProperties) {
+ this.consumerProperties = consumerProperties;
+ }
+
+ protected void configureConsumer(Consumer<E> consumer) {
+ if (consumerProperties != null) {
+ IntrospectionSupport.setProperties(consumer, consumerProperties);
+ }
+ }
+
+ public void configureProperties(Map options) {
+ Map consumerProperties =
IntrospectionSupport.extractProperties(options, "consumer.");
+ if (consumerProperties != null) {
+ setConsumerProperties(consumerProperties);
+ }
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java?view=auto&rev=533794
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
Mon Apr 30 09:41:50 2007
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * A Strategy pattern for handling exceptions; particularly in asynchronous
processes such as consumers
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface ExceptionHandler {
+
+ /**
+ * Handles the given exception
+ *
+ * @param exception the exception
+ */
+ void handleException(Throwable exception);
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
---
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java
(original)
+++
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java
Mon Apr 30 09:41:50 2007
@@ -17,12 +17,12 @@
*/
package org.apache.camel.component.file;
-import java.io.File;
-import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
-import org.apache.camel.util.IntrospectionSupport;
+
+import java.io.File;
+import java.util.Map;
/**
* @version $Revision: 523772 $
@@ -37,8 +37,7 @@
protected Endpoint<FileExchange> createEndpoint(String uri, String
remaining, Map parameters) throws Exception {
File file = new File(remaining);
- FileEndpoint result = new FileEndpoint(file, remaining,
this,parameters);
+ FileEndpoint result = new FileEndpoint(file, remaining, this);
return result;
}
-
}
Modified:
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
---
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
Mon Apr 30 09:41:50 2007
@@ -31,8 +31,8 @@
* @version $Revision: 523016 $
*/
public class FileConsumer extends PollingConsumer<FileExchange>{
-
private static final transient Log
log=LogFactory.getLog(FileConsumer.class);
+
private final FileEndpoint endpoint;
private boolean recursive=true;
private boolean attemptFileLock=false;
Modified:
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
---
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
(original)
+++
activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
Mon Apr 30 09:41:50 2007
@@ -17,32 +17,41 @@
*/
package org.apache.camel.component.file;
-import java.io.File;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.PollingEndpoint;
import org.apache.camel.util.IntrospectionSupport;
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
/**
* @version $Revision: 523016 $
*/
-public class FileEndpoint extends DefaultEndpoint<FileExchange> {
+public class FileEndpoint extends PollingEndpoint<FileExchange> {
private File file;
- private Map parameters;
- protected FileEndpoint(File file,String endpointUri, Component
component,Map parameters){
- super(endpointUri,component);
+ private ScheduledExecutorService executor;
+
+ protected FileEndpoint(File file, String endpointUri, FileComponent
component) {
+ super(endpointUri, component);
this.file = file;
- this.parameters=parameters;
- IntrospectionSupport.setProperties(this, parameters);
+ this.executor = component.getExecutorService();
}
-
- private ScheduledExecutorService executor;
+
+ /**
+ * @return a Producer
+ * @throws Exception
+ * @see org.apache.camel.Endpoint#createProducer()
+ */
+ public Producer<FileExchange> createProducer() throws Exception {
+ Producer<FileExchange> result = new FileProducer(this);
+ return startService(result);
+ }
/**
* @param file
@@ -50,68 +59,51 @@
* @throws Exception
* @see
org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor)
*/
- public Consumer<FileExchange> createConsumer(Processor<FileExchange> file)
throws Exception{
- Consumer<FileExchange> result = new FileConsumer(this, file,
getExecutor());
- IntrospectionSupport.setProperties(result, parameters);
- return result;
+ public Consumer<FileExchange> createConsumer(Processor<FileExchange> file)
throws Exception {
+ Consumer<FileExchange> result = new FileConsumer(this, file,
getExecutor());
+ configureConsumer(result);
+ return startService(result);
}
/**
- * @param file
+ * @param file
* @return a FileExchange
* @see org.apache.camel.Endpoint#createExchange()
*/
- public FileExchange createExchange(File file){
- return new FileExchange(getContext(),file);
+ public FileExchange createExchange(File file) {
+ return new FileExchange(getContext(), file);
}
-
+
/**
* @return an Exchange
* @see org.apache.camel.Endpoint#createExchange()
*/
- public FileExchange createExchange(){
+ public FileExchange createExchange() {
return createExchange(this.file);
}
-
- /**
- * @return a Producer
- * @throws Exception
- * @see org.apache.camel.Endpoint#createProducer()
- */
- public Producer<FileExchange> createProducer() throws Exception{
- Producer<FileExchange> result = new FileProducer(this);
- IntrospectionSupport.setProperties(result, parameters);
- return result;
- }
-
-
/**
* @return the executor
*/
- public synchronized ScheduledExecutorService getExecutor(){
- if (this.executor==null) {
- this.executor=new ScheduledThreadPoolExecutor(10);
+ public synchronized ScheduledExecutorService getExecutor() {
+ if (this.executor == null) {
+ this.executor = new ScheduledThreadPoolExecutor(10);
}
return executor;
}
-
/**
* @param executor the executor to set
*/
- public synchronized void setExecutor(ScheduledExecutorService executor){
- this.executor=executor;
+ public synchronized void setExecutor(ScheduledExecutorService executor) {
+ this.executor = executor;
}
-
+
public File getFile() {
return file;
}
- public boolean isSingleton() {
- return true;
- }
-
-
-
+ public boolean isSingleton() {
+ return true;
+ }
}
Modified:
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
---
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
(original)
+++
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
Mon Apr 30 09:41:50 2007
@@ -56,14 +56,6 @@
@Override
protected Endpoint<Exchange> createEndpoint(String uri, String path, Map
options) throws Exception {
JpaEndpoint endpoint = new JpaEndpoint(uri, this);
- Map consumerProperties =
IntrospectionSupport.extractProperties(options, "consumer.");
- if (consumerProperties != null) {
- endpoint.setConsumerProperties(consumerProperties);
- }
- Map emProperties = IntrospectionSupport.extractProperties(options,
"emf.");
- if (emProperties != null) {
- endpoint.setEntityManagerProperties(emProperties);
- }
// lets interpret the next string as a class
if (path != null) {
Modified:
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
---
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
(original)
+++
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
Mon Apr 30 09:41:50 2007
@@ -26,6 +26,7 @@
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.PollingEndpoint;
import org.apache.camel.util.IntrospectionSupport;
import org.springframework.orm.jpa.JpaTemplate;
@@ -37,14 +38,13 @@
/**
* @version $Revision$
*/
-public class JpaEndpoint extends DefaultEndpoint<Exchange> {
+public class JpaEndpoint extends PollingEndpoint<Exchange> {
private EntityManagerFactory entityManagerFactory;
private String persistenceUnit = "camel";
private JpaTemplate template;
private Expression<Exchange> producerExpression;
private int maximumResults = -1;
private Class<?> entityType;
- private Map consumerProperties;
private Map entityManagerProperties;
public JpaEndpoint(String uri, JpaComponent component) {
@@ -62,12 +62,23 @@
public Consumer<Exchange> createConsumer(Processor<Exchange> processor)
throws Exception {
JpaConsumer consumer = new JpaConsumer(this, processor,
getExecutorService());
- if (consumerProperties != null) {
- IntrospectionSupport.setProperties(consumer, consumerProperties);
- }
+ configureConsumer(consumer);
return startService(consumer);
}
+ @Override
+ public void configureProperties(Map options) {
+ super.configureProperties(options);
+ Map emProperties = IntrospectionSupport.extractProperties(options,
"emf.");
+ if (emProperties != null) {
+ setEntityManagerProperties(emProperties);
+ }
+ }
+
+ public boolean isSingleton() {
+ return false;
+ }
+
// Properties
//-------------------------------------------------------------------------
public JpaTemplate getTemplate() {
@@ -108,14 +119,6 @@
this.entityType = entityType;
}
- public Map getConsumerProperties() {
- return consumerProperties;
- }
-
- public void setConsumerProperties(Map consumerProperties) {
- this.consumerProperties = consumerProperties;
- }
-
public EntityManagerFactory getEntityManagerFactory() {
if (entityManagerFactory == null) {
entityManagerFactory = createEntityManagerFactory();
@@ -190,9 +193,4 @@
};
}
}
-
- public boolean isSingleton() {
- return true;
- }
-
}