NIFI-988: Test cases for PutDistributedMapCache

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ee7d89cb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ee7d89cb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ee7d89cb

Branch: refs/heads/master
Commit: ee7d89cb01d4661cfff2c4f0d093e38758680a56
Parents: 6b1328f
Author: Joe <[email protected]>
Authored: Wed Sep 23 14:32:37 2015 +0200
Committer: Joe <[email protected]>
Committed: Wed Sep 23 14:32:37 2015 +0200

----------------------------------------------------------------------
 .../standard/TestPutDistributedMapCache.java    | 280 +++++++++++++++++++
 1 file changed, 280 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ee7d89cb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
new file mode 100644
index 0000000..8347e7f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestPutDistributedMapCache {
+
+    private TestRunner runner;
+    private MockCacheClient service;
+    private PutDistributedMapCache processor;
+
+    @Before
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(PutDistributedMapCache.class);
+
+        service = new MockCacheClient();
+        runner.addControllerService("service", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutDistributedMapCache.DISTRIBUTED_CACHE_SERVICE, 
"service");
+    }
+
+    @Test
+    public void testNoCacheKey() throws InitializationException {
+
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, 
"${caheKeyAttribute}");
+        runner.enqueue(new byte[]{});
+
+        runner.run();
+
+        // no cache key attribute
+        
runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testSingleFlowFile() throws InitializationException, 
IOException {
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, 
"${caheKeyAttribute}");
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("caheKeyAttribute", "1");
+
+        String flowFileContent = "content";
+        runner.enqueue(flowFileContent.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_SUCCESS, 1);
+        byte[] value = service.get("1", new 
PutDistributedMapCache.StringSerializer(), new 
PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(flowFileContent, new String(value, "UTF-8"));
+
+        final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "true");
+        outputFlowFile.assertContentEquals(flowFileContent);
+        runner.clearTransferState();
+
+    }
+
+    @Test
+    public void testNothingToCache() throws InitializationException, 
IOException {
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, 
"${caheKeyAttribute}");
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("caheKeyAttribute", "2");
+
+        // flow file without content
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testMaxCacheEntrySize() throws InitializationException, 
IOException {
+
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, 
"${uuid}");
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_MAX_BYTES, "10 
B");
+
+        // max length is 10 bytes, flow file content is 20 bytes
+        String flowFileContent = "contentwhichistoobig";
+        runner.enqueue(flowFileContent.getBytes("UTF-8"));
+
+        runner.run();
+
+        // no cache key attribute
+        
runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_FAILURE, 1);
+
+        final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_FAILURE).get(0);
+        outputFlowFile.assertAttributeNotExists("cached");
+        outputFlowFile.assertContentEquals(flowFileContent);
+
+
+        runner.clearTransferState();
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_MAX_BYTES, "1 
MB");
+    }
+
+    @Test
+    public void testCacheStrategyReplace() throws InitializationException, 
IOException {
+
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, 
"${caheKeyAttribute}");
+        runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, 
PutDistributedMapCache.CACHE_UPDATE_REPLACE.getValue());
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("caheKeyAttribute", "replaceme");
+
+        String original = "original";
+        runner.enqueue(original.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_SUCCESS, 1);
+
+        MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "true");
+        outputFlowFile.assertContentEquals(original);
+
+        runner.clearTransferState();
+        byte[] value = service.get("replaceme", new 
PutDistributedMapCache.StringSerializer(), new 
PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(original, new String(value, "UTF-8"));
+
+        String replaced = "replaced";
+        runner.enqueue(replaced.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_SUCCESS, 1);
+
+        outputFlowFile = 
runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "true");
+        outputFlowFile.assertContentEquals(replaced);
+
+        runner.clearTransferState();
+
+        //we expect that the cache entry is replaced
+        value = service.get("replaceme", new 
PutDistributedMapCache.StringSerializer(), new 
PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(replaced, new String(value, "UTF-8"));
+    }
+
+    @Test
+    public void testCacheStrategyKeepOriginal() throws 
InitializationException, IOException {
+
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, 
"${caheKeyAttribute}");
+        runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, 
PutDistributedMapCache.CACHE_UPDATE_KEEP_ORIGINAL.getValue());
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("caheKeyAttribute", "replaceme");
+
+        String original = "original";
+        runner.enqueue(original.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_SUCCESS, 1);
+
+        MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "true");
+        outputFlowFile.assertContentEquals(original);
+
+        runner.clearTransferState();
+        byte[] value = service.get("replaceme", new 
PutDistributedMapCache.StringSerializer(), new 
PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(original, new String(value, "UTF-8"));
+
+        String replaced = "replaced";
+        runner.enqueue(replaced.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_FAILURE, 1);
+
+        outputFlowFile = 
runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_FAILURE).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "false");
+        outputFlowFile.assertContentEquals(replaced);
+
+        runner.clearTransferState();
+
+        //we expect that the cache entry is NOT replaced
+        value = service.get("replaceme", new 
PutDistributedMapCache.StringSerializer(), new 
PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(original, new String(value, "UTF-8"));
+    }
+
+    private class MockCacheClient extends AbstractControllerService implements 
DistributedMapCacheClient {
+        private final ConcurrentMap<Object, Object> values = new 
ConcurrentHashMap<>();
+        private boolean failOnCalls = false;
+
+        private void verifyNotFail() throws IOException {
+            if ( failOnCalls ) {
+                throw new IOException("Could not call to remote service 
because Unit Test marked service unavailable");
+            }
+        }
+
+        @Override
+        public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+            verifyNotFail();
+            final Object retValue = values.putIfAbsent(key, value);
+            return (retValue == null);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+                                          final Deserializer<V> 
valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.putIfAbsent(key, value);
+        }
+
+        @Override
+        public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
+            verifyNotFail();
+            return values.containsKey(key);
+        }
+
+        @Override
+        public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+            verifyNotFail();
+            values.put(key, value);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V get(final K key, final Serializer<K> keySerializer, 
final Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.get(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public <K> boolean remove(final K key, final Serializer<K> serializer) 
throws IOException {
+            verifyNotFail();
+            values.remove(key);
+            return true;
+        }
+    }
+
+
+}
\ No newline at end of file

Reply via email to