Author: davsclaus
Date: Mon Jun 27 13:53:36 2011
New Revision: 1140159
URL: http://svn.apache.org/viewvc?rev=1140159&view=rev
Log:
CAMEL-3655: Refactored having reusable code in file consumer to support both
scheduled consumer and polling consumer.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1140159&r1=1140158&r2=1140159&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Mon Jun 27 13:53:36 2011
@@ -42,9 +42,9 @@ import org.slf4j.LoggerFactory;
*/
public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer
implements BatchConsumer, ShutdownAware {
protected final transient Logger log = LoggerFactory.getLogger(getClass());
+ protected final ProcessFile processFile;
protected GenericFileEndpoint<T> endpoint;
protected GenericFileOperations<T> operations;
- protected boolean loggedIn;
protected String fileExpressionResult;
protected int maxMessagesPerPoll;
protected volatile ShutdownRunningTask shutdownRunningTask;
@@ -54,6 +54,13 @@ public abstract class GenericFileConsume
super(endpoint, processor);
this.endpoint = endpoint;
this.operations = operations;
+ this.processFile = new ProcessFile(this);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public GenericFileEndpoint<T> getEndpoint() {
+ return (GenericFileEndpoint<T>) super.getEndpoint();
}
/**
@@ -74,7 +81,7 @@ public abstract class GenericFileConsume
// gather list of files to process
List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
- String name = endpoint.getConfiguration().getDirectory();
+ String name = getEndpoint().getConfiguration().getDirectory();
// time how long time it takes to poll
StopWatch stop = new StopWatch();
@@ -90,21 +97,21 @@ public abstract class GenericFileConsume
}
// sort files using file comparator if provided
- if (endpoint.getSorter() != null) {
- Collections.sort(files, endpoint.getSorter());
+ if (getEndpoint().getSorter() != null) {
+ Collections.sort(files, getEndpoint().getSorter());
}
// sort using build in sorters so we can use expressions
LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
for (GenericFile<T> file : files) {
- Exchange exchange = endpoint.createExchange(file);
- endpoint.configureExchange(exchange);
- endpoint.configureMessage(file, exchange.getIn());
+ Exchange exchange = getEndpoint().createExchange(file);
+ getEndpoint().configureExchange(exchange);
+ getEndpoint().configureMessage(file, exchange.getIn());
exchanges.add(exchange);
}
// sort files using exchange comparator if provided
- if (endpoint.getSortBy() != null) {
- Collections.sort(exchanges, endpoint.getSortBy());
+ if (getEndpoint().getSortBy() != null) {
+ Collections.sort(exchanges, getEndpoint().getSortBy());
}
// consume files one by one
@@ -156,7 +163,7 @@ public abstract class GenericFileConsume
Exchange exchange = (Exchange) exchanges.poll();
GenericFile<T> file = (GenericFile<T>)
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
String key = file.getAbsoluteFilePath();
- endpoint.getInProgressRepository().remove(key);
+ getEndpoint().getInProgressRepository().remove(key);
}
return total;
@@ -251,83 +258,22 @@ public abstract class GenericFileConsume
}
/**
+ * Gets the operations to be used
+ *
+ * @return the operations
+ */
+ public GenericFileOperations<T> getOperations() {
+ return operations;
+ }
+
+ /**
* Processes the exchange
*
* @param exchange the exchange
*/
protected void processExchange(final Exchange exchange) {
- GenericFile<T> file = getExchangeFileProperty(exchange);
- log.trace("Processing file: {}", file);
-
- // must extract the absolute name before the begin strategy as the
file could potentially be pre moved
- // and then the file name would be changed
- String absoluteFileName = file.getAbsoluteFilePath();
-
- // check if we can begin processing the file
- try {
- final GenericFileProcessStrategy<T> processStrategy =
endpoint.getGenericFileProcessStrategy();
-
- boolean begin = processStrategy.begin(operations, endpoint,
exchange, file);
- if (!begin) {
- log.debug(endpoint + " cannot begin processing file: {}",
file);
- // begin returned false, so remove file from the in progress
list as its no longer in progress
- endpoint.getInProgressRepository().remove(absoluteFileName);
- return;
- }
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug(endpoint + " cannot begin processing file: " + file
+ " due to: " + e.getMessage(), e);
- }
- endpoint.getInProgressRepository().remove(absoluteFileName);
- return;
- }
-
- // must use file from exchange as it can be updated due the
- // preMoveNamePrefix/preMoveNamePostfix options
- final GenericFile<T> target = getExchangeFileProperty(exchange);
- // must use full name when downloading so we have the correct path
- final String name = target.getAbsoluteFilePath();
- try {
- // retrieve the file using the stream
- log.trace("Retrieving file: {} from: {}", name, endpoint);
-
- // retrieve the file and check it was a success
- boolean retrieved = operations.retrieveFile(name, exchange);
- if (!retrieved) {
- // throw exception to handle the problem with retrieving the
file
- // then if the method return false or throws an exception is
handled the same in here
- // as in both cases an exception is being thrown
- throw new GenericFileOperationFailedException("Cannot retrieve
file: " + file + " from: " + endpoint);
- }
-
- log.trace("Retrieved file: {} from: {}", name, endpoint);
-
- // register on completion callback that does the completion
strategies
- // (for instance to move the file after we have processed it)
- exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint,
operations, target, absoluteFileName));
-
- log.debug("About to process file: {} using exchange: {}", target,
exchange);
-
- // process the exchange using the async consumer to support async
routing engine
- // which can be supported by this file consumer as all the done
work is
- // provided in the GenericFileOnCompletion
- getAsyncProcessor().process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // noop
- if (log.isTraceEnabled()) {
- log.trace("Done processing file: {} {}", target,
doneSync ? "synchronously" : "asynchronously");
- }
- }
- });
-
- } catch (Exception e) {
- // remove file from the in progress list due to failure
- // (cannot be in finally block due to GenericFileOnCompletion will
remove it
- // from in progress when it takes over and processes the file,
which may happen
- // by another thread at a later time. So its only safe to remove
it if there was an exception)
- endpoint.getInProgressRepository().remove(absoluteFileName);
- handleException(e);
- }
+ // let the process do the work
+ processFile.processExchange(exchange);
}
/**
@@ -341,7 +287,7 @@ public abstract class GenericFileConsume
if (!isMatched(file, isDirectory)) {
log.trace("File did not match. Will skip this file: {}", file);
return false;
- } else if (endpoint.isIdempotent() &&
endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
+ } else if (getEndpoint().isIdempotent() &&
getEndpoint().getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
log.trace("This consumer is idempotent and the file has been
consumed before. Will skip this file: {}", file);
return false;
}
@@ -382,26 +328,26 @@ public abstract class GenericFileConsume
return true;
}
- if (endpoint.getFilter() != null) {
- if (!endpoint.getFilter().accept(file)) {
+ if (getEndpoint().getFilter() != null) {
+ if (!getEndpoint().getFilter().accept(file)) {
return false;
}
}
- if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
- if (name.matches(endpoint.getExclude())) {
+ if (ObjectHelper.isNotEmpty(getEndpoint().getExclude())) {
+ if (name.matches(getEndpoint().getExclude())) {
return false;
}
}
- if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
- if (!name.matches(endpoint.getInclude())) {
+ if (ObjectHelper.isNotEmpty(getEndpoint().getInclude())) {
+ if (!name.matches(getEndpoint().getInclude())) {
return false;
}
}
// use file expression for a simple dynamic file filter
- if (endpoint.getFileName() != null) {
+ if (getEndpoint().getFileName() != null) {
evaluateFileExpression();
if (fileExpressionResult != null) {
if (!name.equals(fileExpressionResult)) {
@@ -411,19 +357,19 @@ public abstract class GenericFileConsume
}
// if done file name is enabled, then the file is only valid if a done
file exists
- if (endpoint.getDoneFileName() != null) {
+ if (getEndpoint().getDoneFileName() != null) {
// done file must be in same path as the file
- String doneFileName =
endpoint.createDoneFileName(file.getAbsoluteFilePath());
- ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
+ String doneFileName =
getEndpoint().createDoneFileName(file.getAbsoluteFilePath());
+ ObjectHelper.notEmpty(doneFileName, "doneFileName", getEndpoint());
// is it a done file name?
- if (endpoint.isDoneFile(file.getFileNameOnly())) {
+ if (getEndpoint().isDoneFile(file.getFileNameOnly())) {
log.trace("Skipping done file: {}", file);
return false;
}
// the file is only valid if the done file exist
- if (!operations.existsFile(doneFileName)) {
+ if (!getOperations().existsFile(doneFileName)) {
log.trace("Done file: {} does not exist", doneFileName);
return false;
}
@@ -440,13 +386,13 @@ public abstract class GenericFileConsume
*/
protected boolean isInProgress(GenericFile<T> file) {
String key = file.getAbsoluteFilePath();
- return !endpoint.getInProgressRepository().add(key);
+ return !getEndpoint().getInProgressRepository().add(key);
}
private void evaluateFileExpression() {
if (fileExpressionResult == null) {
// create a dummy exchange as Exchange is needed for expression
evaluation
- Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
+ Exchange dummy = new
DefaultExchange(getEndpoint().getCamelContext());
fileExpressionResult = endpoint.getFileName().evaluate(dummy,
String.class);
}
}
@@ -461,6 +407,35 @@ public abstract class GenericFileConsume
super.doStart();
// prepare on startup
- endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations,
endpoint);
+
getEndpoint().getGenericFileProcessStrategy().prepareOnStartup(getOperations(),
getEndpoint());
}
+
+ /**
+ * Class the processes the exchange when a file has been polled.
+ */
+ private class ProcessFile extends GenericFileConsumerSupport<T> {
+
+ public ProcessFile(GenericFileConsumer<T> consumer) {
+ super(consumer);
+ }
+
+ @Override
+ void handleExceptionStrategy(Exception e) {
+ // handle the exception on the consumer
+ handleException(e);
+ }
+
+ @Override
+ void processFileStrategy(Exchange exchange) {
+ // process the exchange using the async consumer to support async
routing engine
+ // which can be supported by this file consumer as all the done
work is
+ // provided in the GenericFileOnCompletion
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // noop
+ }
+ });
+ }
+ }
+
}
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java?rev=1140159&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
Mon Jun 27 13:53:36 2011
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Support class for both a regular {@link org.apache.camel.Consumer}
+ * and a {@link org.apache.camel.PollingConsumer} using the file component.
+ * <p/>
+ * This class contains shared code between the two kind of consumers, to reuse
logic.
+ * <p/>
+ * The method {@link #processExchange(org.apache.camel.Exchange)} should be
invoked to
+ * process the consumed file. Then custom implementations can implement the
strategy
+ * method for their custom logic.
+ */
+public abstract class GenericFileConsumerSupport<T> {
+ protected final transient Logger log = LoggerFactory.getLogger(getClass());
+ protected final GenericFileConsumer<T> consumer;
+
+ public GenericFileConsumerSupport(GenericFileConsumer<T> consumer) {
+ this.consumer = consumer;
+ }
+
+ /**
+ * Strategy to process the consumed file
+ *
+ * @param exchange the exchange with the file details
+ */
+ abstract void processFileStrategy(Exchange exchange);
+
+ /**
+ * Strategy to handle the exception thrown that occurred while processing
the consumer file
+ * <p/>
+ * Implementations will usually delegate to a {@link
org.apache.camel.spi.ExceptionHandler}
+ * to handle the given exception.
+ *
+ * @param e the caused exception
+ */
+ abstract void handleExceptionStrategy(Exception e);
+
+ /**
+ * Processes the exchange.
+ * <p/>
+ * This method should be invoked to process the consumed file
+ *
+ * @param exchange the exchange
+ */
+ protected void processExchange(final Exchange exchange) {
+ GenericFile<T> file = getExchangeFileProperty(exchange);
+ log.trace("Processing file: {}", file);
+
+ // must extract the absolute name before the begin strategy as the
file could potentially be pre moved
+ // and then the file name would be changed
+ String absoluteFileName = file.getAbsoluteFilePath();
+
+ // check if we can begin processing the file
+ try {
+ final GenericFileProcessStrategy<T> processStrategy =
consumer.getEndpoint().getGenericFileProcessStrategy();
+
+ boolean begin = processStrategy.begin(consumer.getOperations(),
consumer.getEndpoint(), exchange, file);
+ if (!begin) {
+ log.debug(consumer.getEndpoint() + " cannot begin processing
file: {}", file);
+ // begin returned false, so remove file from the in progress
list as its no longer in progress
+
consumer.getEndpoint().getInProgressRepository().remove(absoluteFileName);
+ return;
+ }
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug(consumer.getEndpoint() + " cannot begin processing
file: " + file + " due to: " + e.getMessage(), e);
+ }
+
consumer.getEndpoint().getInProgressRepository().remove(absoluteFileName);
+ return;
+ }
+
+ // must use file from exchange as it can be updated due the
+ // preMoveNamePrefix/preMoveNamePostfix options
+ final GenericFile<T> target = getExchangeFileProperty(exchange);
+ // must use full name when downloading so we have the correct path
+ final String name = target.getAbsoluteFilePath();
+ try {
+ // retrieve the file using the stream
+ log.trace("Retrieving file: {} from: {}", name,
consumer.getEndpoint());
+
+ // retrieve the file and check it was a success
+ boolean retrieved = consumer.getOperations().retrieveFile(name,
exchange);
+ if (!retrieved) {
+ // throw exception to handle the problem with retrieving the
file
+ // then if the method return false or throws an exception is
handled the same in here
+ // as in both cases an exception is being thrown
+ throw new GenericFileOperationFailedException("Cannot retrieve
file: " + file + " from: " + consumer.getEndpoint());
+ }
+
+ log.trace("Retrieved file: {} from: {}", name,
consumer.getEndpoint());
+
+ // register on completion callback that does the completion
strategies
+ // (for instance to move the file after we have processed it)
+ exchange.addOnCompletion(new
GenericFileOnCompletion<T>(consumer.getEndpoint(), consumer.getOperations(),
target, absoluteFileName));
+
+ log.debug("About to process file: {} using exchange: {}", target,
exchange);
+
+ // process the file
+ processFileStrategy(exchange);
+
+ } catch (Exception e) {
+ // remove file from the in progress list due to failure
+ // (cannot be in finally block due to GenericFileOnCompletion will
remove it
+ // from in progress when it takes over and processes the file,
which may happen
+ // by another thread at a later time. So its only safe to remove
it if there was an exception)
+
consumer.getEndpoint().getInProgressRepository().remove(absoluteFileName);
+ handleExceptionStrategy(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
+ return (GenericFile<T>)
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
+ }
+
+}
Modified:
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1140159&r1=1140158&r2=1140159&view=diff
==============================================================================
---
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
(original)
+++
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
Mon Jun 27 13:53:36 2011
@@ -39,11 +39,14 @@ public abstract class RemoteFileConsumer
return (RemoteFileEndpoint<T>) super.getEndpoint();
}
- protected RemoteFileOperations getOperations() {
+ public RemoteFileOperations getOperations() {
return (RemoteFileOperations) operations;
}
protected boolean prePollCheck() throws Exception {
+ if (log.isTraceEnabled()) {
+ log.trace("prePollCheck on " +
getEndpoint().getConfiguration().remoteServerInformation());
+ }
try {
if (getEndpoint().getMaximumReconnectAttempts() > 0) {
// only use recoverable if we are allowed any re-connect
attempts
@@ -71,6 +74,9 @@ public abstract class RemoteFileConsumer
@Override
protected void postPollCheck() {
+ if (log.isTraceEnabled()) {
+ log.trace("postPollCheck on " +
getEndpoint().getConfiguration().remoteServerInformation());
+ }
if (getEndpoint().isDisconnect()) {
log.trace("postPollCheck disconnect from: {}", getEndpoint());
disconnect();