Author: davsclaus
Date: Thu Dec  4 03:35:14 2008
New Revision: 723291

URL: http://svn.apache.org/viewvc?rev=723291&view=rev
Log:
CAMEL-1099: Added FileIdempotentRepositry

Added:
    
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/
    
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
   (with props)
    
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/
    
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
   (with props)
Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=723291&r1=723290&r2=723291&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
 Thu Dec  4 03:35:14 2008
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.Map;
 import java.util.Scanner;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.util.LRUCache;
@@ -40,34 +41,53 @@
     private static final transient Log LOG = 
LogFactory.getLog(FileIdempotentRepository.class);
     private static final String STORE_DELIMITER = "\n";
     private Map<String, Object> cache;
-    private File store;
-    private long maxStoreSize = 1024 * 1000L; // 1mb store file
+    private File fileStore;
+    private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
+    private AtomicBoolean init = new AtomicBoolean();
 
-    public FileIdempotentRepository(final File store, final Map<String, 
Object> set) {
-        this.store = store;
+    public FileIdempotentRepository() {
+        // default use a 1st level cache 
+        this.cache = new LRUCache<String, Object>(1000);
+    }
+
+    public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
+        this.fileStore = fileStore;
         this.cache = set;
-        loadStore();
     }
 
     /**
      * Creates a new file based repository using a [EMAIL PROTECTED] 
org.apache.camel.util.LRUCache}
      * as 1st level cache with a default of 1000 entries in the cache.
      *
-     * @param store  the file store
+     * @param fileStore  the file store
      */
-    public static IdempotentRepository fileIdempotentRepository(File store) {
-        return fileIdempotentRepository(store, 1000);
+    public static IdempotentRepository fileIdempotentRepository(File 
fileStore) {
+        return fileIdempotentRepository(fileStore, 1000);
     }
 
     /**
      * Creates a new file based repository using a [EMAIL PROTECTED] 
org.apache.camel.util.LRUCache}
      * as 1st level cache.
      *
-     * @param store  the file store
+     * @param fileStore  the file store
      * @param cacheSize  the cache size
      */
-    public static IdempotentRepository fileIdempotentRepository(File store, 
int cacheSize) {
-        return fileIdempotentRepository(store, new LRUCache<String, 
Object>(cacheSize));
+    public static IdempotentRepository fileIdempotentRepository(File 
fileStore, int cacheSize) {
+        return fileIdempotentRepository(fileStore, new LRUCache<String, 
Object>(cacheSize));
+    }
+
+    /**
+     * Creates a new file based repository using a [EMAIL PROTECTED] 
org.apache.camel.util.LRUCache}
+     * as 1st level cache.
+     *
+     * @param fileStore  the file store
+     * @param cacheSize  the cache size
+     * @param maxFileStoreSize  the max size in bytes for the filestore file 
+     */
+    public static IdempotentRepository fileIdempotentRepository(File 
fileStore, int cacheSize, long maxFileStoreSize) {
+        FileIdempotentRepository repository = new 
FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
+        repository.setMaxFileStoreSize(maxFileStoreSize);
+        return repository;
     }
 
     /**
@@ -86,11 +106,16 @@
 
     public boolean add(String messageId) {
         synchronized (cache) {
+            // init store if not loaded before
+            if (init.compareAndSet(false, true)) {
+                loadStore();
+            }
+
             if (cache.containsKey(messageId)) {
                 return false;
             } else {
                 cache.put(messageId, messageId);
-                if (store.length() < maxStoreSize) {
+                if (fileStore.length() < maxFileStoreSize) {
                     // just append to store
                     appendToStore(messageId);
                 } else {
@@ -105,16 +130,20 @@
 
     public boolean contains(String key) {
         synchronized (cache) {
+            // init store if not loaded before
+            if (init.compareAndSet(false, true)) {
+                loadStore();
+            }
             return cache.containsKey(key);
         }
     }
 
-    public File getStore() {
-        return store;
+    public File getFileStore() {
+        return fileStore;
     }
 
-    public void setStore(File store) {
-        this.store = store;
+    public void setFileStore(File fileStore) {
+        this.fileStore = fileStore;
     }
 
     public Map<String, Object> getCache() {
@@ -125,8 +154,8 @@
         this.cache = cache;
     }
 
-    public long getMaxStoreSize() {
-        return maxStoreSize;
+    public long getMaxFileStoreSize() {
+        return maxFileStoreSize;
     }
 
     /**
@@ -134,8 +163,18 @@
      * <p/>
      * The default is 1mb.
      */
-    public void setMaxStoreSize(long maxStoreSize) {
-        this.maxStoreSize = maxStoreSize;
+    public void setMaxFileStoreSize(long maxFileStoreSize) {
+        this.maxFileStoreSize = maxFileStoreSize;
+    }
+
+    /**
+     * Sets the cache size
+     */
+    public void setCacheSize(int size) {
+        if (cache != null) {
+            cache.clear();
+        }
+        cache = new LRUCache<String, Object>(size);
     }
 
     /**
@@ -145,11 +184,16 @@
      */
     protected void appendToStore(final String messageId) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Appending " + messageId + " to idempotent filestore: " 
+ store);
+            LOG.debug("Appending " + messageId + " to idempotent filestore: " 
+ fileStore);
         }
         FileOutputStream fos = null;
         try {
-            fos = new FileOutputStream(store, true);
+            // create store if missing
+            if (!fileStore.exists()) {
+                fileStore.createNewFile();
+            }
+            // append to store
+            fos = new FileOutputStream(fileStore, true);
             fos.write(messageId.getBytes());
             fos.write(STORE_DELIMITER.getBytes());
         } catch (IOException e) {
@@ -165,11 +209,11 @@
      */
     protected void trunkStore() {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Trunking idempotent filestore: " + store);
+            LOG.debug("Trunking idempotent filestore: " + fileStore);
         }
         FileOutputStream fos = null;
         try {
-            fos = new FileOutputStream(store);
+            fos = new FileOutputStream(fileStore);
             for (String key : cache.keySet()) {
                 fos.write(key.getBytes());
                 fos.write(STORE_DELIMITER.getBytes());
@@ -186,17 +230,17 @@
      */
     protected void loadStore() {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Loading to 1st level cache from idempotent filestore: " 
+ store);
+            LOG.trace("Loading to 1st level cache from idempotent filestore: " 
+ fileStore);
         }
 
-        if (!store.exists()) {
+        if (!fileStore.exists()) {
             return;
         }
 
         cache.clear();
         Scanner scanner = null;
         try {
-            scanner = new Scanner(store);
+            scanner = new Scanner(fileStore);
             scanner.useDelimiter(STORE_DELIMITER);
             while (scanner.hasNextLine()) {
                 String line = scanner.nextLine();
@@ -211,7 +255,7 @@
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Loaded " + cache.size() + " to the 1st level cache from 
idempotent filestore: " + store);
+            LOG.debug("Loaded " + cache.size() + " to the 1st level cache from 
idempotent filestore: " + fileStore);
         }
     }
 

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=723291&r1=723290&r2=723291&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
 Thu Dec  4 03:35:14 2008
@@ -33,6 +33,10 @@
 
     private Map<String, Object> cache;
 
+    public MemoryIdempotentRepository() {
+        this.cache = new LRUCache<String, Object>(1000);
+    }
+
     public MemoryIdempotentRepository(Map<String, Object> set) {
         this.cache = set;
     }
@@ -42,7 +46,7 @@
      * with a default of 1000 entries in the cache.
      */
     public static IdempotentRepository memoryIdempotentRepository() {
-        return memoryIdempotentRepository(1000);
+        return new MemoryIdempotentRepository();
     }
 
     /**

Added: 
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java?rev=723291&view=auto
==============================================================================
--- 
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
 (added)
+++ 
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
 Thu Dec  4 03:35:14 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.idempotent;
+
+import java.io.File;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.component.file.FileComponent;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.IdempotentRepository;
+import static 
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class FileConsumerIdempotentTest extends ContextTestSupport {
+
+    private IdempotentRepository repo;
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, 
"org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml");
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/fileidempotent");
+
+        super.setUp();
+        repo = context.getRegistry().lookup("fileStore", 
IdempotentRepository.class);
+    }
+
+
+    public void testIdempotent() throws Exception {
+        assertFalse(repo.contains("report.txt"));
+
+        // send a file
+        template.sendBodyAndHeader("file://target/fileidempotent/", "Hello 
World", FileComponent.HEADER_FILE_NAME, "report.txt");
+
+        // consume the file the first time
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        assertMockEndpointsSatisfied();
+
+        // reset mock and set new expectations
+        mock.reset();
+        mock.expectedMessageCount(0);
+
+        // move file back
+        File file = new File("target/fileidempotent/done/report.txt");
+        File renamed = new File("target/fileidempotent/report.txt");
+        file = file.getAbsoluteFile();
+        file.renameTo(renamed.getAbsoluteFile());
+
+        // sleep to let the consumer try to poll the file
+        Thread.sleep(2000);
+
+        // should NOT consume the file again, let 2 secs pass to let the 
consumer try to consume it but it should not
+        assertMockEndpointsSatisfied();
+
+        assertTrue(repo.contains("report.txt"));
+    }
+
+}
+

Propchange: 
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/idempotent/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml?rev=723291&view=auto
==============================================================================
--- 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
 (added)
+++ 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
 Thu Dec  4 03:35:14 2008
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+       http://activemq.apache.org/camel/schema/spring 
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+    ">
+
+    <!-- START SNIPPET: example -->
+    <!-- this is our file based idempotent store configured to use the 
.filestore.dat as file -->
+    <bean id="fileStore" 
class="org.apache.camel.processor.idempotent.FileIdempotentRepository">
+        <!-- the filename for the store -->
+        <property name="fileStore" 
value="target/fileidempotent/.filestore.dat"/>
+        <!-- the max filesize in bytes for the file. Camel will trunk and 
flush the cache
+             if the file gets bigger -->
+        <property name="maxFileStoreSize" value="512000"/>
+        <!-- the number of elements in our store -->
+        <property name="cacheSize" value="250"/>
+    </bean>
+
+    <camelContext id="camel" 
xmlns="http://activemq.apache.org/camel/schema/spring";>
+        <route>
+            <from 
uri="file://target/fileidempotent/?idempotent=true&amp;idempotentRepositoryRef=fileStore&amp;moveNamePrefix=done/"/>
+            <to uri="mock:result"/>
+        </route>
+    </camelContext>
+    <!-- END SNIPPET: example -->
+</beans>

Propchange: 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/idempotent/fileConsumerIdempotentTest.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml


Reply via email to