Repository: nifi
Updated Branches:
  refs/heads/master 0b7371556 -> d1ebddce9


NIFI-3627: Added removeByPattern() to DistributedMapCache interfaces
NIFI-3627: Updated unit tests that use MapCache interface(s)

Signed-off-by: Pierre Villard <[email protected]>

This closes #1609.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1ebddce
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1ebddce
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1ebddce

Branch: refs/heads/master
Commit: d1ebddce988da81737120de530a9c1bb28d1683d
Parents: 0b73715
Author: Matt Burgess <[email protected]>
Authored: Tue Mar 21 18:15:54 2017 -0400
Committer: Pierre Villard <[email protected]>
Committed: Wed Mar 22 19:22:08 2017 +0100

----------------------------------------------------------------------
 .../nifi/processors/hadoop/TestListHDFS.java    | 20 +++++
 .../org/apache/nifi/hbase/TestGetHBase.java     | 19 +++++
 .../standard/TestAbstractListProcessor.java     | 18 +++++
 .../standard/TestDetectDuplicate.java           |  5 ++
 .../standard/TestFetchDistributedMapCache.java  | 21 +++++
 .../nifi/processors/standard/TestNotify.java    | 21 +++++
 .../standard/TestPutDistributedMapCache.java    | 21 +++++
 .../cache/client/DistributedMapCacheClient.java |  8 ++
 .../DistributedMapCacheClientService.java       | 14 ++++
 .../distributed/cache/server/map/MapCache.java  |  3 +
 .../cache/server/map/MapCacheServer.java        |  7 ++
 .../cache/server/map/PersistentMapCache.java    | 19 +++++
 .../cache/server/map/SimpleMapCache.java        | 30 +++++++
 .../cache/server/TestServerAndClient.java       | 82 +++++---------------
 14 files changed, 226 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index bdb058e..f0fce5a 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -43,6 +43,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -51,6 +52,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
@@ -485,5 +488,22 @@ public class TestListHDFS {
             values.remove(key);
             return true;
         }
+
+        @Override
+        public long removeByPattern(String regex) throws IOException {
+            verifyNotFail();
+            final List<Object> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (Object key : values.keySet()) {
+                // Key must be backed by something that array() returns a 
byte[] that can be converted into a String via the default charset
+                Matcher m = p.matcher(key.toString());
+                if (m.matches()) {
+                    removedRecords.add(values.get(key));
+                }
+            }
+            final long numRemoved = removedRecords.size();
+            removedRecords.forEach(values::remove);
+            return numRemoved;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
index 8f6d890..24f83e9 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.state.Scope;
@@ -493,6 +495,23 @@ public class TestGetHBase {
             values.remove(key);
             return true;
         }
+
+        @Override
+        public long removeByPattern(String regex) throws IOException {
+            verifyNotFail();
+            final List<Object> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (Object key : values.keySet()) {
+                // Key must be backed by something that array() returns a 
byte[] that can be converted into a String via the default charset
+                Matcher m = p.matcher(key.toString());
+                if (m.matches()) {
+                    removedRecords.add(values.get(key));
+                }
+            }
+            final long numRemoved = removedRecords.size();
+            removedRecords.forEach(values::remove);
+            return numRemoved;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
index 9896396..ee7e237 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.Charsets;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -445,6 +447,22 @@ public class TestAbstractListProcessor {
             final Object value = stored.remove(key);
             return value != null;
         }
+
+        @Override
+        public long removeByPattern(String regex) throws IOException {
+            final List<Object> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (Object key : stored.keySet()) {
+                // Key must be backed by something that can be converted into 
a String
+                Matcher m = p.matcher(key.toString());
+                if (m.matches()) {
+                    removedRecords.add(stored.get(key));
+                }
+            }
+            final long numRemoved = removedRecords.size();
+            removedRecords.forEach(stored::remove);
+            return numRemoved;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index b23d56b..ef0bd59 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@ -243,6 +243,11 @@ public class TestDetectDuplicate {
         }
 
         @Override
+        public long removeByPattern(String regex) throws IOException {
+            return exists ? 1L : 0L;
+        }
+
+        @Override
         public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
             cacheValue = value;
             exists = true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
index cba4d65..549ad13 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
@@ -28,10 +28,14 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class TestFetchDistributedMapCache {
 
@@ -210,6 +214,23 @@ public class TestFetchDistributedMapCache {
             values.remove(key);
             return true;
         }
+
+        @Override
+        public long removeByPattern(String regex) throws IOException {
+            verifyNotFail();
+            final List<Object> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (Object key : values.keySet()) {
+                // Key must be backed by something that can be converted into 
a String
+                Matcher m = p.matcher(key.toString());
+                if (m.matches()) {
+                    removedRecords.add(values.get(key));
+                }
+            }
+            final long numRemoved = removedRecords.size();
+            removedRecords.forEach(values::remove);
+            return numRemoved;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
index 0494b18..2c5dbc1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
@@ -30,10 +30,14 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -375,6 +379,23 @@ public class TestNotify {
         }
 
         @Override
+        public long removeByPattern(String regex) throws IOException {
+            verifyNotFail();
+            final List<Object> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (Object key : values.keySet()) {
+                // Key must be backed by something that can be converted into 
a String
+                Matcher m = p.matcher(key.toString());
+                if (m.matches()) {
+                    removedRecords.add(values.get(key));
+                }
+            }
+            final long numRemoved = removedRecords.size();
+            removedRecords.forEach(values::remove);
+            return numRemoved;
+        }
+
+        @Override
         @SuppressWarnings("unchecked")
         public <K, V> CacheEntry<K, V> fetch(K key, Serializer<K> 
keySerializer, Deserializer<V> valueDeserializer) throws IOException {
             verifyNotFail();

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
index 9bd649b..b6a9be1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
@@ -19,10 +19,14 @@ package org.apache.nifi.processors.standard;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -271,6 +275,23 @@ public class TestPutDistributedMapCache {
             values.remove(key);
             return true;
         }
+
+        @Override
+        public long removeByPattern(String regex) throws IOException {
+            verifyNotFail();
+            final List<Object> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (Object key : values.keySet()) {
+                // Key must be backed by something that can be converted into 
a String
+                Matcher m = p.matcher(key.toString());
+                if (m.matches()) {
+                    removedRecords.add(values.get(key));
+                }
+            }
+            final long numRemoved = removedRecords.size();
+            removedRecords.forEach(values::remove);
+            return numRemoved;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index ea3bb63..e593f9d 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@ -137,4 +137,12 @@ public interface DistributedMapCacheClient extends 
ControllerService {
      */
     <K> boolean remove(K key, Serializer<K> serializer) throws IOException;
 
+    /**
+     * Removes entries whose keys match the specified pattern
+     *
+     * @param regex The regular expression / pattern on which to match the 
keys to be removed
+     * @return The number of entries that were removed
+     * @throws IOException if any error occurred while removing an entry
+     */
+    long removeByPattern(String regex) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 5379bc1..81013f6 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -217,6 +217,20 @@ public class DistributedMapCacheClientService extends 
AbstractControllerService
     }
 
     @Override
+    public long removeByPattern(String regex) throws IOException {
+        return withCommsSession(session -> {
+            final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
+            dos.writeUTF("removeByPattern");
+            dos.writeUTF(regex);
+            dos.flush();
+
+            // read response
+            final DataInputStream dis = new 
DataInputStream(session.getInputStream());
+            return dis.readLong();
+        });
+    }
+
+    @Override
     public <K, V> CacheEntry<K, V> fetch(final K key, final Serializer<K> 
keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
         return withCommsSession(session -> {
             validateProtocolVersion(session, 2);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index 67f5bab..8bd9bdc 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server.map;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 public interface MapCache {
 
@@ -31,6 +32,8 @@ public interface MapCache {
 
     ByteBuffer remove(ByteBuffer key) throws IOException;
 
+    Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws 
IOException;
+
     MapCacheRecord fetch(ByteBuffer key) throws IOException;
 
     MapPutResult replace(MapCacheRecord record) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index 99eacd7..21090bc 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import javax.net.ssl.SSLContext;
 
@@ -126,6 +127,12 @@ public class MapCacheServer extends AbstractCacheServer {
                 dos.writeBoolean(removed);
                 break;
             }
+            case "removeByPattern": {
+                final String pattern = dis.readUTF();
+                final Map<ByteBuffer, ByteBuffer> removed = 
cache.removeByPattern(pattern);
+                dos.writeLong(removed == null ? 0 : removed.size());
+                break;
+            }
             case "fetch": {
                 final byte[] key = readValue(dis);
                 final MapCacheRecord existing = 
cache.fetch(ByteBuffer.wrap(key));

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index da457bd..9f1375c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -128,6 +128,25 @@ public class PersistentMapCache implements MapCache {
     }
 
     @Override
+    public Map<ByteBuffer, ByteBuffer> removeByPattern(final String regex) 
throws IOException {
+        final Map<ByteBuffer, ByteBuffer> removeResult = 
wrapped.removeByPattern(regex);
+        if (removeResult != null) {
+            final List<MapWaliRecord> records = new 
ArrayList<>(removeResult.size());
+            for(Map.Entry<ByteBuffer, ByteBuffer> entry : 
removeResult.entrySet()) {
+                final MapWaliRecord record = new 
MapWaliRecord(UpdateType.DELETE, entry.getKey(), entry.getValue());
+                records.add(record);
+                wali.update(records, false);
+
+                final long modCount = modifications.getAndIncrement();
+                if (modCount > 0 && modCount % 1000 == 0) {
+                    wali.checkpoint();
+                }
+            }
+        }
+        return removeResult;
+    }
+
+    @Override
     public void shutdown() throws IOException {
         wali.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index ebcf91a..baa2d0f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@ -19,13 +19,17 @@ package org.apache.nifi.distributed.cache.server.map;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.distributed.cache.server.EvictionPolicy;
 
@@ -182,6 +186,32 @@ public class SimpleMapCache implements MapCache {
     }
 
     @Override
+    public Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws 
IOException {
+        writeLock.lock();
+        try {
+            final Map<ByteBuffer, ByteBuffer> removedMap = new HashMap<>();
+            final List<MapCacheRecord> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (ByteBuffer key : cache.keySet()) {
+                // Key must be backed by something that array() returns a 
byte[] that can be converted into a String via the default charset
+                Matcher m = p.matcher(new String(key.array()));
+                if (m.matches()) {
+                    removedRecords.add(cache.get(key));
+                }
+            }
+            removedRecords.forEach((record) -> {
+                cache.remove(record.getKey());
+                inverseCacheMap.remove(record);
+                removedMap.put(record.getKey(), record.getValue());
+            });
+
+            return removedMap;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
     public MapCacheRecord fetch(ByteBuffer key) throws IOException {
         readLock.lock();
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
index 0f5675c..7811ada 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SystemUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -50,7 +49,6 @@ import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.MockControllerServiceInitializationContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Assume;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -75,13 +73,6 @@ public class TestServerAndClient {
     @Test
     public void testNonPersistentSetServerAndClient() throws 
InitializationException, IOException {
 
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug
-         * See:  https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("test is skipped due to build environment being OS 
X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437";,
-            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
-
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
         // Create server
         final TestRunner runner = 
TestRunners.newTestRunner(Mockito.mock(Processor.class));
@@ -111,12 +102,6 @@ public class TestServerAndClient {
 
     @Test
     public void testPersistentSetServerAndClient() throws 
InitializationException, IOException {
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug
-         * See:  https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("test is skipped due to build environment being OS 
X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437";,
-            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
 
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
 
@@ -169,12 +154,6 @@ public class TestServerAndClient {
 
     @Test
     public void testPersistentSetServerAndClientWithLFUEvictions() throws 
InitializationException, IOException {
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug
-         * See:  https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("test is skipped due to build environment being OS 
X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437";,
-            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
 
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
         // Create server
@@ -237,12 +216,6 @@ public class TestServerAndClient {
 
     @Test
     public void testPersistentMapServerAndClientWithLFUEvictions() throws 
InitializationException, IOException {
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug
-         * See:  https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("test is skipped due to build environment being OS 
X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437";,
-            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
 
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
         // Create server
@@ -298,18 +271,22 @@ public class TestServerAndClient {
         assertFalse(client.containsKey("test3", serializer));
         assertTrue(client.containsKey("test4", serializer));
 
+        // Test removeByPattern, the first two should be removed and the last 
should remain
+        client.put("test.1", "1", serializer, serializer);
+        client.put("test.2", "2", serializer, serializer);
+        client.put("test3", "2", serializer, serializer);
+        final long removedTwo = client.removeByPattern("test\\..*");
+        assertEquals(2L, removedTwo);
+        assertFalse(client.containsKey("test.1", serializer));
+        assertFalse(client.containsKey("test.2", serializer));
+        assertTrue(client.containsKey("test3", serializer));
+
         newServer.shutdownServer();
         client.close();
     }
 
     @Test
     public void testPersistentSetServerAndClientWithFIFOEvictions() throws 
InitializationException, IOException {
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug
-         * See:  https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("test is skipped due to build environment being OS 
X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437";,
-            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
 
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
 
@@ -379,12 +356,6 @@ public class TestServerAndClient {
 
     @Test
     public void testNonPersistentMapServerAndClient() throws 
InitializationException, IOException, InterruptedException {
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug
-         * See:  https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("test is skipped due to build environment being OS 
X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437";,
-            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
 
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
 
@@ -428,6 +399,16 @@ public class TestServerAndClient {
         assertTrue(removed);
         LOGGER.debug("end remove");
 
+        // Test removeByPattern, the first two should be removed and the last 
should remain
+        client.put("test.1", "1", keySerializer, keySerializer);
+        client.put("test.2", "2", keySerializer, keySerializer);
+        client.put("test3", "2", keySerializer, keySerializer);
+        final long removedTwo = client.removeByPattern("test\\..*");
+        assertEquals(2L, removedTwo);
+        assertFalse(client.containsKey("test.1", keySerializer));
+        assertFalse(client.containsKey("test.2", keySerializer));
+        assertTrue(client.containsKey("test3", keySerializer));
+
         final boolean containedAfterRemove = client.containsKey("testKey", 
keySerializer);
         assertFalse(containedAfterRemove);
 
@@ -439,9 +420,6 @@ public class TestServerAndClient {
         } catch (final Exception e) {
 
         }
-        client = null;
-        clientInitContext = null;
-        clientContext = null;
 
         DistributedMapCacheClientService client2 = new 
DistributedMapCacheClientService();
         MockControllerServiceInitializationContext clientInitContext2 = new 
MockControllerServiceInitializationContext(client2, "client2");
@@ -468,12 +446,6 @@ public class TestServerAndClient {
     @Test
     public void testClientTermination() throws InitializationException, 
IOException, InterruptedException {
 
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("testClientTermination is skipped due to build 
environment being OS X with JDK 1.8. See 
https://issues.apache.org/jira/browse/NIFI-437";,
-            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
-
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
         // Create server
         final DistributedMapCacheServer server = new MapServer();
@@ -526,13 +498,6 @@ public class TestServerAndClient {
     @Test
     public void testOptimisticLock() throws Exception {
 
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug
-         * See:  https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("test is skipped due to build environment being OS 
X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437";,
-                SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
-
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
 
         // Create server
@@ -608,13 +573,6 @@ public class TestServerAndClient {
     @Test
     public void testBackwardCompatibility() throws Exception {
 
-        /**
-         * This bypasses the test for build environments in OS X running Java 
1.8 due to a JVM bug
-         * See:  https://issues.apache.org/jira/browse/NIFI-437
-         */
-        Assume.assumeFalse("test is skipped due to build environment being OS 
X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437";,
-                SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
-
         LOGGER.info("Testing " + 
Thread.currentThread().getStackTrace()[1].getMethodName());
 
         final TestRunner runner = 
TestRunners.newTestRunner(Mockito.mock(Processor.class));

Reply via email to