Repository: nifi Updated Branches: refs/heads/master 0b7371556 -> d1ebddce9
NIFI-3627: Added removeByPattern() to DistributedMapCache interfaces NIFI-3627: Updated unit tests that use MapCache interface(s) Signed-off-by: Pierre Villard <[email protected]> This closes #1609. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1ebddce Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1ebddce Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1ebddce Branch: refs/heads/master Commit: d1ebddce988da81737120de530a9c1bb28d1683d Parents: 0b73715 Author: Matt Burgess <[email protected]> Authored: Tue Mar 21 18:15:54 2017 -0400 Committer: Pierre Villard <[email protected]> Committed: Wed Mar 22 19:22:08 2017 +0100 ---------------------------------------------------------------------- .../nifi/processors/hadoop/TestListHDFS.java | 20 +++++ .../org/apache/nifi/hbase/TestGetHBase.java | 19 +++++ .../standard/TestAbstractListProcessor.java | 18 +++++ .../standard/TestDetectDuplicate.java | 5 ++ .../standard/TestFetchDistributedMapCache.java | 21 +++++ .../nifi/processors/standard/TestNotify.java | 21 +++++ .../standard/TestPutDistributedMapCache.java | 21 +++++ .../cache/client/DistributedMapCacheClient.java | 8 ++ .../DistributedMapCacheClientService.java | 14 ++++ .../distributed/cache/server/map/MapCache.java | 3 + .../cache/server/map/MapCacheServer.java | 7 ++ .../cache/server/map/PersistentMapCache.java | 19 +++++ .../cache/server/map/SimpleMapCache.java | 30 +++++++ .../cache/server/TestServerAndClient.java | 82 +++++--------------- 14 files changed, 226 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index bdb058e..f0fce5a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -43,6 +43,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -51,6 +52,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -485,5 +488,22 @@ public class TestListHDFS { values.remove(key); return true; } + + @Override + public long removeByPattern(String regex) throws IOException { + verifyNotFail(); + final List<Object> removedRecords = new ArrayList<>(); + Pattern p = Pattern.compile(regex); + for (Object key : values.keySet()) { + // Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset + Matcher m = p.matcher(key.toString()); + if (m.matches()) { + removedRecords.add(values.get(key)); + } + } + final long numRemoved = removedRecords.size(); + removedRecords.forEach(values::remove); + return numRemoved; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java index 8f6d890..24f83e9 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java @@ -33,6 +33,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.state.Scope; @@ -493,6 +495,23 @@ public class TestGetHBase { values.remove(key); return true; } + + @Override + public long removeByPattern(String regex) throws IOException { + verifyNotFail(); + final List<Object> removedRecords = new ArrayList<>(); + Pattern p = Pattern.compile(regex); + for (Object key : values.keySet()) { + // Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset + Matcher m = p.matcher(key.toString()); + if (m.matches()) { + removedRecords.add(values.get(key)); + } + } + final long numRemoved = removedRecords.size(); + removedRecords.forEach(values::remove); + return numRemoved; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index 9896396..ee7e237 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.io.Charsets; import org.apache.nifi.components.PropertyDescriptor; @@ -445,6 +447,22 @@ public class TestAbstractListProcessor { final Object value = stored.remove(key); return value != null; } + + @Override + public long removeByPattern(String regex) throws IOException { + final List<Object> removedRecords = new ArrayList<>(); + Pattern p = Pattern.compile(regex); + for (Object key : stored.keySet()) { + // Key must be backed by something that can be converted into a String + Matcher m = p.matcher(key.toString()); + if (m.matches()) { + removedRecords.add(stored.get(key)); + } + } + final long numRemoved = removedRecords.size(); + removedRecords.forEach(stored::remove); + return numRemoved; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index b23d56b..ef0bd59 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -243,6 +243,11 @@ public class TestDetectDuplicate { } @Override + public long removeByPattern(String regex) throws IOException { + return exists ? 1L : 0L; + } + + @Override public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { cacheValue = value; exists = true; http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java index cba4d65..549ad13 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java @@ -28,10 +28,14 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class TestFetchDistributedMapCache { @@ -210,6 +214,23 @@ public class TestFetchDistributedMapCache { values.remove(key); return true; } + + @Override + public long removeByPattern(String regex) throws IOException { + verifyNotFail(); + final List<Object> removedRecords = new ArrayList<>(); + Pattern p = Pattern.compile(regex); + for (Object key : values.keySet()) { + // Key must be backed by something that can be converted into a String + Matcher m = p.matcher(key.toString()); + if (m.matches()) { + removedRecords.add(values.get(key)); + } + } + final long numRemoved = removedRecords.size(); + removedRecords.forEach(values::remove); + return numRemoved; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java index 0494b18..2c5dbc1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -30,10 +30,14 @@ import org.junit.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -375,6 +379,23 @@ public class TestNotify { } @Override + public long removeByPattern(String regex) throws IOException { + verifyNotFail(); + final List<Object> removedRecords = new ArrayList<>(); + Pattern p = Pattern.compile(regex); + for (Object key : values.keySet()) { + // Key must be backed by something that can be converted into a String + Matcher m = p.matcher(key.toString()); + if (m.matches()) { + removedRecords.add(values.get(key)); + } + } + final long numRemoved = removedRecords.size(); + removedRecords.forEach(values::remove); + return numRemoved; + } + + @Override @SuppressWarnings("unchecked") public <K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { verifyNotFail(); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java index 9bd649b..b6a9be1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java @@ -19,10 +19,14 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; @@ -271,6 +275,23 @@ public class TestPutDistributedMapCache { values.remove(key); return true; } + + @Override + public long removeByPattern(String regex) throws IOException { + verifyNotFail(); + final List<Object> removedRecords = new ArrayList<>(); + Pattern p = Pattern.compile(regex); + for (Object key : values.keySet()) { + // Key must be backed by something that can be converted into a String + Matcher m = p.matcher(key.toString()); + if (m.matches()) { + removedRecords.add(values.get(key)); + } + } + final long numRemoved = removedRecords.size(); + removedRecords.forEach(values::remove); + return numRemoved; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index ea3bb63..e593f9d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -137,4 +137,12 @@ public interface DistributedMapCacheClient extends ControllerService { */ <K> boolean remove(K key, Serializer<K> serializer) throws IOException; + /** + * Removes entries whose keys match the specified pattern + * + * @param regex The regular expression / pattern on which to match the keys to be removed + * @return The number of entries that were removed + * @throws IOException if any error occurred while removing an entry + */ + long removeByPattern(String regex) throws IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 5379bc1..81013f6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -217,6 +217,20 @@ public class DistributedMapCacheClientService extends AbstractControllerService } @Override + public long removeByPattern(String regex) throws IOException { + return withCommsSession(session -> { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("removeByPattern"); + dos.writeUTF(regex); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readLong(); + }); + } + + @Override public <K, V> CacheEntry<K, V> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { return withCommsSession(session -> { validateProtocolVersion(session, 2); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java index 67f5bab..8bd9bdc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server.map; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Map; public interface MapCache { @@ -31,6 +32,8 @@ public interface MapCache { ByteBuffer remove(ByteBuffer key) throws IOException; + Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException; + MapCacheRecord fetch(ByteBuffer key) throws IOException; MapPutResult replace(MapCacheRecord record) throws IOException; http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index 99eacd7..21090bc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Map; import javax.net.ssl.SSLContext; @@ -126,6 +127,12 @@ public class MapCacheServer extends AbstractCacheServer { dos.writeBoolean(removed); break; } + case "removeByPattern": { + final String pattern = dis.readUTF(); + final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern); + dos.writeLong(removed == null ? 0 : removed.size()); + break; + } case "fetch": { final byte[] key = readValue(dis); final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key)); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index da457bd..9f1375c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -128,6 +128,25 @@ public class PersistentMapCache implements MapCache { } @Override + public Map<ByteBuffer, ByteBuffer> removeByPattern(final String regex) throws IOException { + final Map<ByteBuffer, ByteBuffer> removeResult = wrapped.removeByPattern(regex); + if (removeResult != null) { + final List<MapWaliRecord> records = new ArrayList<>(removeResult.size()); + for(Map.Entry<ByteBuffer, ByteBuffer> entry : removeResult.entrySet()) { + final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, entry.getKey(), entry.getValue()); + records.add(record); + wali.update(records, false); + + final long modCount = modifications.getAndIncrement(); + if (modCount > 0 && modCount % 1000 == 0) { + wali.checkpoint(); + } + } + } + return removeResult; + } + + @Override public void shutdown() throws IOException { wali.shutdown(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index ebcf91a..baa2d0f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -19,13 +19,17 @@ package org.apache.nifi.distributed.cache.server.map; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.nifi.distributed.cache.server.EvictionPolicy; @@ -182,6 +186,32 @@ public class SimpleMapCache implements MapCache { } @Override + public Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException { + writeLock.lock(); + try { + final Map<ByteBuffer, ByteBuffer> removedMap = new HashMap<>(); + final List<MapCacheRecord> removedRecords = new ArrayList<>(); + Pattern p = Pattern.compile(regex); + for (ByteBuffer key : cache.keySet()) { + // Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset + Matcher m = p.matcher(new String(key.array())); + if (m.matches()) { + removedRecords.add(cache.get(key)); + } + } + removedRecords.forEach((record) -> { + cache.remove(record.getKey()); + inverseCacheMap.remove(record); + removedMap.put(record.getKey(), record.getValue()); + }); + + return removedMap; + } finally { + writeLock.unlock(); + } + } + + @Override public MapCacheRecord fetch(ByteBuffer key) throws IOException { readLock.lock(); try { http://git-wip-us.apache.org/repos/asf/nifi/blob/d1ebddce/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java index 0f5675c..7811ada 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang3.SerializationException; -import org.apache.commons.lang3.SystemUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; @@ -50,7 +49,6 @@ import org.apache.nifi.util.MockConfigurationContext; import org.apache.nifi.util.MockControllerServiceInitializationContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Assume; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; @@ -75,13 +73,6 @@ public class TestServerAndClient { @Test public void testNonPersistentSetServerAndClient() throws InitializationException, IOException { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); - LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); // Create server final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); @@ -111,12 +102,6 @@ public class TestServerAndClient { @Test public void testPersistentSetServerAndClient() throws InitializationException, IOException { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); @@ -169,12 +154,6 @@ public class TestServerAndClient { @Test public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); // Create server @@ -237,12 +216,6 @@ public class TestServerAndClient { @Test public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); // Create server @@ -298,18 +271,22 @@ public class TestServerAndClient { assertFalse(client.containsKey("test3", serializer)); assertTrue(client.containsKey("test4", serializer)); + // Test removeByPattern, the first two should be removed and the last should remain + client.put("test.1", "1", serializer, serializer); + client.put("test.2", "2", serializer, serializer); + client.put("test3", "2", serializer, serializer); + final long removedTwo = client.removeByPattern("test\\..*"); + assertEquals(2L, removedTwo); + assertFalse(client.containsKey("test.1", serializer)); + assertFalse(client.containsKey("test.2", serializer)); + assertTrue(client.containsKey("test3", serializer)); + newServer.shutdownServer(); client.close(); } @Test public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); @@ -379,12 +356,6 @@ public class TestServerAndClient { @Test public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); @@ -428,6 +399,16 @@ public class TestServerAndClient { assertTrue(removed); LOGGER.debug("end remove"); + // Test removeByPattern, the first two should be removed and the last should remain + client.put("test.1", "1", keySerializer, keySerializer); + client.put("test.2", "2", keySerializer, keySerializer); + client.put("test3", "2", keySerializer, keySerializer); + final long removedTwo = client.removeByPattern("test\\..*"); + assertEquals(2L, removedTwo); + assertFalse(client.containsKey("test.1", keySerializer)); + assertFalse(client.containsKey("test.2", keySerializer)); + assertTrue(client.containsKey("test3", keySerializer)); + final boolean containedAfterRemove = client.containsKey("testKey", keySerializer); assertFalse(containedAfterRemove); @@ -439,9 +420,6 @@ public class TestServerAndClient { } catch (final Exception e) { } - client = null; - clientInitContext = null; - clientContext = null; DistributedMapCacheClientService client2 = new DistributedMapCacheClientService(); MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2"); @@ -468,12 +446,6 @@ public class TestServerAndClient { @Test public void testClientTermination() throws InitializationException, IOException, InterruptedException { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("testClientTermination is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); - LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); // Create server final DistributedMapCacheServer server = new MapServer(); @@ -526,13 +498,6 @@ public class TestServerAndClient { @Test public void testOptimisticLock() throws Exception { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); - LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); // Create server @@ -608,13 +573,6 @@ public class TestServerAndClient { @Test public void testBackwardCompatibility() throws Exception { - /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 - */ - Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); - LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
