Author: lhein
Date: Fri Jul 4 04:58:18 2008
New Revision: 674020
URL: http://svn.apache.org/viewvc?rev=674020&view=rev
Log:
reworked endpoint to use async send only (SM-1441)
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
Modified:
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=674020&r1=674019&r2=674020&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(original)
+++
servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
Fri Jul 4 04:58:18 2008
@@ -22,7 +22,8 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import javax.jbi.JBIException;
@@ -61,6 +62,7 @@
private File archive;
private FileMarshaler marshaler = new DefaultFileMarshaler();
private LockManager lockManager;
+ private ConcurrentMap<String, File> openExchanges;
public FilePollerEndpoint() {
}
@@ -73,6 +75,17 @@
super(component, endpoint);
}
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
+ */
+ @Override
+ public synchronized void start() throws Exception {
+ super.start();
+
+ // create the openExchanges map
+ this.openExchanges = new ConcurrentHashMap<String, File>();
+ }
+
public void poll() throws Exception {
pollFileOrDirectory(file);
}
@@ -193,7 +206,7 @@
public void setArchive(File archive) {
this.archive = archive;
}
-
+
// Implementation methods
//-------------------------------------------------------------------------
@@ -225,14 +238,7 @@
String uri = file.toURI().relativize(aFile.toURI()).toString();
Lock lock = lockManager.getLock(uri);
if (lock.tryLock()) {
- boolean unlock = true;
- try {
- unlock = processFileAndDelete(aFile);
- } finally {
- if (unlock) {
- lock.unlock();
- }
- }
+ processFileNow(aFile);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Unable to acquire lock on " + aFile);
@@ -242,55 +248,30 @@
});
}
- protected boolean processFileAndDelete(File aFile) {
- boolean unlock = true;
+ protected void processFileNow(File aFile) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Processing file " + aFile);
}
if (aFile.exists()) {
processFile(aFile);
- unlock = false;
- if (isDeleteFile()) {
- if (archive != null) {
- moveFile(aFile, archive);
- } else {
- if (!aFile.delete()) {
- throw new IOException("Could not delete file " +
aFile);
- }
- }
- unlock = true;
- }
}
} catch (Exception e) {
logger.error("Failed to process file: " + aFile + ". Reason: " +
e, e);
}
- return unlock;
}
protected void processFile(File aFile) throws Exception {
InputStream in = null;
- try {
- String name = aFile.getCanonicalPath();
- in = new BufferedInputStream(new FileInputStream(aFile));
- InOnly exchange = getExchangeFactory().createInOnlyExchange();
- configureExchangeTarget(exchange);
- NormalizedMessage message = exchange.createMessage();
- exchange.setInMessage(message);
- marshaler.readMessage(exchange, message, in, name);
- sendSync(exchange);
- if (exchange.getStatus() == ExchangeStatus.ERROR) {
- Exception e = exchange.getError();
- if (e == null) {
- e = new JBIException("Unkown error");
- }
- throw e;
- }
- } finally {
- if (in != null) {
- in.close();
- }
- }
+ String name = aFile.getCanonicalPath();
+ in = new BufferedInputStream(new FileInputStream(aFile));
+ InOnly exchange = getExchangeFactory().createInOnlyExchange();
+ configureExchangeTarget(exchange);
+ NormalizedMessage message = exchange.createMessage();
+ exchange.setInMessage(message);
+ marshaler.readMessage(exchange, message, in, name);
+ send(exchange);
+ this.openExchanges.put(exchange.getExchangeId(), aFile);
}
public String getLocationURI() {
@@ -298,8 +279,61 @@
}
public void process(MessageExchange exchange) throws Exception {
- // Do nothing. In our case, this method should never be called
- // as we only send synchronous InOnly exchange
+ // check for done or error
+ if (this.openExchanges.containsKey(exchange.getExchangeId())) {
+ File aFile = this.openExchanges.get(exchange.getExchangeId());
+
+ logger.debug("Releasing " + aFile.getAbsolutePath());
+ try {
+ // check for state
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ if (isDeleteFile()) {
+ if (archive != null) {
+ moveFile(aFile, archive);
+ } else {
+ if (!aFile.delete()) {
+ throw new IOException("Could not delete file "
+ aFile);
+ }
+ }
+ }
+ } else {
+ Exception e = exchange.getError();
+ if (e == null) {
+ e = new JBIException("Unkown error");
+ }
+ throw e;
+ }
+ } finally {
+ // remove the open exchange
+ openExchanges.remove(exchange.getExchangeId());
+ // unlock the file
+ unlockAsyncFile(aFile);
+ }
+
+ } else {
+ // strange, we don't know this exchange
+ logger.debug("Received unknown exchange. Will be ignored...");
+ return;
+ }
+ }
+
+ /**
+ * unlock the file
+ *
+ * @param file the file to unlock
+ */
+ private void unlockAsyncFile(File file) {
+ // finally remove the file from the open exchanges list
+ String uri = file.toURI().relativize(file.toURI()).toString();
+ Lock lock = lockManager.getLock(uri);
+ if (lock != null) {
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ // can't release the lock
+ logger.error(ex);
+ }
+ }
}
/**
@@ -314,5 +348,4 @@
throw new IOException("Failed to move " + src + " to " +
targetDirectory);
}
}
-
}