Author: davsclaus
Date: Thu Dec 4 03:35:14 2008
New Revision: 723291
URL: http://svn.apache.org/viewvc?rev=723291&view=rev
Log:
CAMEL-1099: Added FileIdempotentRepositry
Added:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
(with props)
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
(with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=723291&r1=723290&r2=723291&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
Thu Dec 4 03:35:14 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Scanner;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.LRUCache;
@@ -40,34 +41,53 @@
private static final transient Log LOG =
LogFactory.getLog(FileIdempotentRepository.class);
private static final String STORE_DELIMITER = "\n";
private Map<String, Object> cache;
- private File store;
- private long maxStoreSize = 1024 * 1000L; // 1mb store file
+ private File fileStore;
+ private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
+ private AtomicBoolean init = new AtomicBoolean();
- public FileIdempotentRepository(final File store, final Map<String,
Object> set) {
- this.store = store;
+ public FileIdempotentRepository() {
+ // default use a 1st level cache
+ this.cache = new LRUCache<String, Object>(1000);
+ }
+
+ public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
+ this.fileStore = fileStore;
this.cache = set;
- loadStore();
}
/**
* Creates a new file based repository using a [EMAIL PROTECTED]
org.apache.camel.util.LRUCache}
* as 1st level cache with a default of 1000 entries in the cache.
*
- * @param store the file store
+ * @param fileStore the file store
*/
- public static IdempotentRepository fileIdempotentRepository(File store) {
- return fileIdempotentRepository(store, 1000);
+ public static IdempotentRepository fileIdempotentRepository(File
fileStore) {
+ return fileIdempotentRepository(fileStore, 1000);
}
/**
* Creates a new file based repository using a [EMAIL PROTECTED]
org.apache.camel.util.LRUCache}
* as 1st level cache.
*
- * @param store the file store
+ * @param fileStore the file store
* @param cacheSize the cache size
*/
- public static IdempotentRepository fileIdempotentRepository(File store,
int cacheSize) {
- return fileIdempotentRepository(store, new LRUCache<String,
Object>(cacheSize));
+ public static IdempotentRepository fileIdempotentRepository(File
fileStore, int cacheSize) {
+ return fileIdempotentRepository(fileStore, new LRUCache<String,
Object>(cacheSize));
+ }
+
+ /**
+ * Creates a new file based repository using a [EMAIL PROTECTED]
org.apache.camel.util.LRUCache}
+ * as 1st level cache.
+ *
+ * @param fileStore the file store
+ * @param cacheSize the cache size
+ * @param maxFileStoreSize the max size in bytes for the filestore file
+ */
+ public static IdempotentRepository fileIdempotentRepository(File
fileStore, int cacheSize, long maxFileStoreSize) {
+ FileIdempotentRepository repository = new
FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
+ repository.setMaxFileStoreSize(maxFileStoreSize);
+ return repository;
}
/**
@@ -86,11 +106,16 @@
public boolean add(String messageId) {
synchronized (cache) {
+ // init store if not loaded before
+ if (init.compareAndSet(false, true)) {
+ loadStore();
+ }
+
if (cache.containsKey(messageId)) {
return false;
} else {
cache.put(messageId, messageId);
- if (store.length() < maxStoreSize) {
+ if (fileStore.length() < maxFileStoreSize) {
// just append to store
appendToStore(messageId);
} else {
@@ -105,16 +130,20 @@
public boolean contains(String key) {
synchronized (cache) {
+ // init store if not loaded before
+ if (init.compareAndSet(false, true)) {
+ loadStore();
+ }
return cache.containsKey(key);
}
}
- public File getStore() {
- return store;
+ public File getFileStore() {
+ return fileStore;
}
- public void setStore(File store) {
- this.store = store;
+ public void setFileStore(File fileStore) {
+ this.fileStore = fileStore;
}
public Map<String, Object> getCache() {
@@ -125,8 +154,8 @@
this.cache = cache;
}
- public long getMaxStoreSize() {
- return maxStoreSize;
+ public long getMaxFileStoreSize() {
+ return maxFileStoreSize;
}
/**
@@ -134,8 +163,18 @@
* <p/>
* The default is 1mb.
*/
- public void setMaxStoreSize(long maxStoreSize) {
- this.maxStoreSize = maxStoreSize;
+ public void setMaxFileStoreSize(long maxFileStoreSize) {
+ this.maxFileStoreSize = maxFileStoreSize;
+ }
+
+ /**
+ * Sets the cache size
+ */
+ public void setCacheSize(int size) {
+ if (cache != null) {
+ cache.clear();
+ }
+ cache = new LRUCache<String, Object>(size);
}
/**
@@ -145,11 +184,16 @@
*/
protected void appendToStore(final String messageId) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Appending " + messageId + " to idempotent filestore: "
+ store);
+ LOG.debug("Appending " + messageId + " to idempotent filestore: "
+ fileStore);
}
FileOutputStream fos = null;
try {
- fos = new FileOutputStream(store, true);
+ // create store if missing
+ if (!fileStore.exists()) {
+ fileStore.createNewFile();
+ }
+ // append to store
+ fos = new FileOutputStream(fileStore, true);
fos.write(messageId.getBytes());
fos.write(STORE_DELIMITER.getBytes());
} catch (IOException e) {
@@ -165,11 +209,11 @@
*/
protected void trunkStore() {
if (LOG.isDebugEnabled()) {
- LOG.debug("Trunking idempotent filestore: " + store);
+ LOG.debug("Trunking idempotent filestore: " + fileStore);
}
FileOutputStream fos = null;
try {
- fos = new FileOutputStream(store);
+ fos = new FileOutputStream(fileStore);
for (String key : cache.keySet()) {
fos.write(key.getBytes());
fos.write(STORE_DELIMITER.getBytes());
@@ -186,17 +230,17 @@
*/
protected void loadStore() {
if (LOG.isTraceEnabled()) {
- LOG.trace("Loading to 1st level cache from idempotent filestore: "
+ store);
+ LOG.trace("Loading to 1st level cache from idempotent filestore: "
+ fileStore);
}
- if (!store.exists()) {
+ if (!fileStore.exists()) {
return;
}
cache.clear();
Scanner scanner = null;
try {
- scanner = new Scanner(store);
+ scanner = new Scanner(fileStore);
scanner.useDelimiter(STORE_DELIMITER);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
@@ -211,7 +255,7 @@
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded " + cache.size() + " to the 1st level cache from
idempotent filestore: " + store);
+ LOG.debug("Loaded " + cache.size() + " to the 1st level cache from
idempotent filestore: " + fileStore);
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=723291&r1=723290&r2=723291&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
Thu Dec 4 03:35:14 2008
@@ -33,6 +33,10 @@
private Map<String, Object> cache;
+ public MemoryIdempotentRepository() {
+ this.cache = new LRUCache<String, Object>(1000);
+ }
+
public MemoryIdempotentRepository(Map<String, Object> set) {
this.cache = set;
}
@@ -42,7 +46,7 @@
* with a default of 1000 entries in the cache.
*/
public static IdempotentRepository memoryIdempotentRepository() {
- return memoryIdempotentRepository(1000);
+ return new MemoryIdempotentRepository();
}
/**
Added:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java?rev=723291&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
(added)
+++
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
Thu Dec 4 03:35:14 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.idempotent;
+
+import java.io.File;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.component.file.FileComponent;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.IdempotentRepository;
+import static
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class FileConsumerIdempotentTest extends ContextTestSupport {
+
+ private IdempotentRepository repo;
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this,
"org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml");
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/fileidempotent");
+
+ super.setUp();
+ repo = context.getRegistry().lookup("fileStore",
IdempotentRepository.class);
+ }
+
+
+ public void testIdempotent() throws Exception {
+ assertFalse(repo.contains("report.txt"));
+
+ // send a file
+ template.sendBodyAndHeader("file://target/fileidempotent/", "Hello
World", FileComponent.HEADER_FILE_NAME, "report.txt");
+
+ // consume the file the first time
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ // reset mock and set new expectations
+ mock.reset();
+ mock.expectedMessageCount(0);
+
+ // move file back
+ File file = new File("target/fileidempotent/done/report.txt");
+ File renamed = new File("target/fileidempotent/report.txt");
+ file = file.getAbsoluteFile();
+ file.renameTo(renamed.getAbsoluteFile());
+
+ // sleep to let the consumer try to poll the file
+ Thread.sleep(2000);
+
+ // should NOT consume the file again, let 2 secs pass to let the
consumer try to consume it but it should not
+ assertMockEndpointsSatisfied();
+
+ assertTrue(repo.contains("report.txt"));
+ }
+
+}
+
Propchange:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml?rev=723291&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
(added)
+++
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
Thu Dec 4 03:35:14 2008
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+ http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+ ">
+
+ <!-- START SNIPPET: example -->
+ <!-- this is our file based idempotent store configured to use the
.filestore.dat as file -->
+ <bean id="fileStore"
class="org.apache.camel.processor.idempotent.FileIdempotentRepository">
+ <!-- the filename for the store -->
+ <property name="fileStore"
value="target/fileidempotent/.filestore.dat"/>
+ <!-- the max filesize in bytes for the file. Camel will trunk and
flush the cache
+ if the file gets bigger -->
+ <property name="maxFileStoreSize" value="512000"/>
+ <!-- the number of elements in our store -->
+ <property name="cacheSize" value="250"/>
+ </bean>
+
+ <camelContext id="camel"
xmlns="http://activemq.apache.org/camel/schema/spring">
+ <route>
+ <from
uri="file://target/fileidempotent/?idempotent=true&idempotentRepositoryRef=fileStore&moveNamePrefix=done/"/>
+ <to uri="mock:result"/>
+ </route>
+ </camelContext>
+ <!-- END SNIPPET: example -->
+</beans>
Propchange:
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml