Repository: camel
Updated Branches:
  refs/heads/master a29e33078 -> ff1c9598c


CAMEL-8750 Camel-Infinispan: Add Remove, RemoveAsync, Replace, ReplaceAsync 
operation for a specific value


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40ac1495
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40ac1495
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40ac1495

Branch: refs/heads/master
Commit: 40ac14959f0ed5f37d7a7f9e8396a082ecb83d2b
Parents: a29e330
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Wed May 6 08:53:43 2015 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Wed May 6 18:03:32 2015 +0200

----------------------------------------------------------------------
 .../infinispan/InfinispanConstants.java         |   1 +
 .../infinispan/InfinispanOperation.java         |  40 ++++-
 .../infinispan/InfinispanProducerTest.java      | 162 +++++++++++++++++++
 3 files changed, 197 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/40ac1495/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index 481446c..2dfe003 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -22,6 +22,7 @@ public interface InfinispanConstants {
     String CACHE_NAME = "CamelInfinispanCacheName";
     String KEY = "CamelInfinispanKey";
     String VALUE = "CamelInfinispanValue";
+    String OLD_VALUE = "CamelInfinispanOldValue";
     String MAP = "CamelInfinispanMap";    
     String OPERATION = "CamelInfinispanOperation";
     String PUT = "CamelInfinispanOperationPut";

http://git-wip-us.apache.org/repos/asf/camel/blob/40ac1495/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
index 38b3e74..efbd876 100644
--- 
a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
+++ 
b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
@@ -214,12 +214,24 @@ public class InfinispanOperation {
                         && 
!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT)))
 {
                         long maxIdle = 
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
                         String maxIdleTimeUnit =  
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, 
String.class);
-                        result = cache.replace(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, 
TimeUnit.valueOf(maxIdleTimeUnit));
+                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
+                            result = cache.replace(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, 
TimeUnit.valueOf(maxIdleTimeUnit));
+                        } else {
+                            result = cache.replace(getKey(exchange), 
getOldValue(exchange), getValue(exchange), lifespan, 
TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                        } 
                     } else {
-                        result = cache.replace(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
+                            result = cache.replace(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        } else {
+                            result = cache.replace(getKey(exchange), 
getOldValue(exchange), getValue(exchange), lifespan, 
TimeUnit.valueOf(timeUnit));
+                        }
                     }
                 } else {
-                    result = cache.replace(getKey(exchange), 
getValue(exchange));
+                    if (ObjectHelper.isEmpty(getOldValue(exchange))) {
+                        result = cache.replace(getKey(exchange), 
getValue(exchange));
+                    } else {
+                        result = cache.replace(getKey(exchange), 
getOldValue(exchange), getValue(exchange));
+                    }
                 }
                 setResult(result, exchange);
             }
@@ -234,12 +246,24 @@ public class InfinispanOperation {
                         && 
!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT)))
 {
                         long maxIdle = 
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
                         String maxIdleTimeUnit =  
exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, 
String.class);
-                        result = cache.replaceAsync(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, 
TimeUnit.valueOf(maxIdleTimeUnit));
+                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
+                            result = cache.replaceAsync(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, 
TimeUnit.valueOf(maxIdleTimeUnit));
+                        } else {
+                            result = cache.replaceAsync(getKey(exchange), 
getOldValue(exchange), getValue(exchange), lifespan, 
TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                        }
                     } else {
-                        result = cache.replaceAsync(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
+                            result = cache.replaceAsync(getKey(exchange), 
getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        } else {
+                            result = cache.replaceAsync(getKey(exchange), 
getOldValue(exchange), getValue(exchange), lifespan, 
TimeUnit.valueOf(timeUnit));
+                        }
                     }
                 } else {
-                    result = cache.replaceAsync(getKey(exchange), 
getValue(exchange));
+                    if (ObjectHelper.isEmpty(getOldValue(exchange))) {
+                        result = cache.replaceAsync(getKey(exchange), 
getValue(exchange));
+                    } else {
+                        result = cache.replaceAsync(getKey(exchange), 
getOldValue(exchange), getValue(exchange));
+                    }
                 }
                 setResult(result, exchange);
             }
@@ -268,6 +292,10 @@ public class InfinispanOperation {
             return exchange.getIn().getHeader(InfinispanConstants.VALUE);
         }
         
+        Object getOldValue(Exchange exchange) {
+            return exchange.getIn().getHeader(InfinispanConstants.OLD_VALUE);
+        }
+        
         Map<? extends Object, ? extends Object>  getMap(Exchange exchange) {
             return (Map<? extends Object, ? extends Object>) 
exchange.getIn().getHeader(InfinispanConstants.MAP);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/40ac1495/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
index c806847..8910493 100644
--- 
a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
+++ 
b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
@@ -688,6 +688,89 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
         String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
         assertEquals(null, resultGet);
     }
+
+    @Test
+    public void replaceAValueByKeyWithOldValue() throws Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replace", new Processor() 
{
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, 
VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, 
VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, 
InfinispanConstants.REPLACE);
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), true);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+    }
+    
+    @Test
+    public void replaceAValueByKeyWithLifespanWithOldValue() throws Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replace", new Processor() 
{
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, 
VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, 
VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, 
new Long(LIFESPAN_TIME));
+                
exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, 
InfinispanConstants.REPLACE);
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), true);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+        
+        Thread.sleep(LIFESPAN_TIME * 1000);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
+    public void replaceAValueByKeyWithLifespanAndMaxIdleTimeWithOldValue() 
throws Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replace", new Processor() 
{
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, 
VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, 
VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, 
new Long(LIFESPAN_FOR_MAX_IDLE));
+                
exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, 
new Long(MAX_IDLE_TIME));
+                
exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, 
InfinispanConstants.REPLACE);
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), true);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+        
+        Thread.sleep(10000);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }    
+    
     
     @Test
     public void replaceAValueByKeyAsync() throws Exception {
@@ -764,6 +847,85 @@ public class InfinispanProducerTest extends 
InfinispanTestSupport {
         String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
         assertEquals(null, resultGet);
     }
+    
+    @Test
+    public void replaceAValueByKeyAsyncWithOldValue() throws Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replaceasync", new 
Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, 
VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, 
VALUE_TWO);
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), true);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+    }
+    
+    @Test
+    public void replaceAValueByKeyWithLifespanAsyncWithOldValue() throws 
Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replaceasync", new 
Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, 
VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, 
VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, 
new Long(LIFESPAN_TIME));
+                
exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), true);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+        
+        Thread.sleep(LIFESPAN_TIME * 1000);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
+    public void 
replaceAValueByKeyWithLifespanAndMaxIdleTimeAsyncWithOldValue() throws 
Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replaceasync", new 
Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, 
VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, 
VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, 
new Long(LIFESPAN_FOR_MAX_IDLE));
+                
exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, 
new Long(MAX_IDLE_TIME));
+                
exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, 
TimeUnit.SECONDS.toString());
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, 
Boolean.class), true);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+        
+        Thread.sleep(10000);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = 
exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
 
     @Test
     public void deletesExistingValueByKey() throws Exception {

Reply via email to