Repository: nifi Updated Branches: refs/heads/master 2fbe922a2 -> c59a96762
NIFI-4504, NIFI-4505 added removeAndGet, removeByPatternAndGet, and keySet methods to MapCache API cleaned up some warnings on deprecated nifi.stream.io classes This closes #2284. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c59a9676 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c59a9676 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c59a9676 Branch: refs/heads/master Commit: c59a967623da5c1dede58e4b8ff21ddadab913fe Parents: 2fbe922 Author: Mike Moser <[email protected]> Authored: Wed Nov 15 21:54:46 2017 +0000 Committer: Koji Kawamura <[email protected]> Committed: Fri Dec 29 00:23:09 2017 +0900 ---------------------------------------------------------------------- .../standard/TestDetectDuplicate.java | 7 +- .../cache/client/DistributedMapCacheClient.java | 45 +++++++++++++ .../DistributedMapCacheClientService.java | 71 ++++++++++++++++++++ .../DistributedSetCacheClientService.java | 4 +- .../cache/client/SSLCommsSession.java | 4 +- .../cache/client/StandardCommsSession.java | 4 +- .../cache/protocol/ProtocolHandshake.java | 1 + .../cache/server/SetCacheServer.java | 2 +- .../distributed/cache/server/map/MapCache.java | 3 + .../cache/server/map/MapCacheServer.java | 48 +++++++++++++ .../cache/server/map/PersistentMapCache.java | 6 ++ .../cache/server/map/SimpleMapCache.java | 11 +++ .../cache/server/TestServerAndClient.java | 48 +++++++++++++ 13 files changed, 246 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index ef0bd59..99c8ef5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -244,7 +244,12 @@ public class TestDetectDuplicate { @Override public long removeByPattern(String regex) throws IOException { - return exists ? 1L : 0L; + if (exists) { + exists = false; + return 1L; + } else { + return 0L; + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index d2e0085..7fa6a61 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -167,6 +167,23 @@ public interface DistributedMapCacheClient extends ControllerService { <K> boolean remove(K key, Serializer<K> serializer) throws IOException; /** + * Removes the entry with the given key from the cache, if it is present, + * and returns the value that was removed from the map. + * + * @param <K> type of key + * @param <V> type of value + * @param key key + * @param keySerializer key serializer + * @param valueDeserializer value deserializer + * @return the value previously associated with the key, or null if there was no mapping + * null can also indicate that the map previously associated null with the key + * @throws IOException ex + */ + default <K, V> V removeAndGet(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { + throw new UnsupportedOperationException(); + } + + /** * Removes entries whose keys match the specified pattern * * @param regex The regular expression / pattern on which to match the keys to be removed @@ -174,4 +191,32 @@ public interface DistributedMapCacheClient extends ControllerService { * @throws IOException if any error occurred while removing an entry */ long removeByPattern(String regex) throws IOException; + + /** + * Removes entries whose keys match the specified pattern, and returns a map of entries that + * were removed. + * + * @param <K> type of key + * @param <V> type of value + * @param regex The regular expression / pattern on which to match the keys to be removed + * @param keyDeserializer key deserializer + * @param valueDeserializer value deserializer + * @return A map of key/value entries that were removed from the cache + * @throws IOException if any error occurred while removing an entry + */ + default <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Returns a set of all keys currently in the cache + * + * @param <K> type of key + * @param keyDeserializer key deserializer + * @return a Set of all keys currently in the cache + * @throws IOException ex + */ + default <K> Set<K> keySet(Deserializer<K> keyDeserializer) throws IOException { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index c454063..e655121 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -256,6 +257,27 @@ public class DistributedMapCacheClientService extends AbstractControllerService } @Override + public <K, V> V removeAndGet(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { + return withCommsSession(new CommsAction<V>() { + @Override + public V execute(final CommsSession session) throws IOException { + validateProtocolVersion(session, 3); + + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("removeAndGet"); + + serialize(key, keySerializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final byte[] responseBuffer = readLengthDelimitedResponse(dis); + return valueDeserializer.deserialize(responseBuffer); + } + }); + } + + @Override public long removeByPattern(String regex) throws IOException { return withCommsSession(session -> { final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); @@ -270,6 +292,34 @@ public class DistributedMapCacheClientService extends AbstractControllerService } @Override + public <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) throws IOException { + return withCommsSession(new CommsAction<Map<K, V>>() { + @Override + public Map<K, V> execute(CommsSession session) throws IOException { + validateProtocolVersion(session, 3); + + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("removeByPatternAndGet"); + dos.writeUTF(regex); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final int mapSize = dis.readInt(); + HashMap<K, V> resultMap = new HashMap<>(mapSize); + for (int i=0; i<mapSize; i++) { + final byte[] keyBuffer = readLengthDelimitedResponse(dis); + K key = keyDeserializer.deserialize(keyBuffer); + final byte[] valueBuffer = readLengthDelimitedResponse(dis); + V value = valueDeserializer.deserialize(valueBuffer); + resultMap.put(key, value); + } + return resultMap; + } + }); + } + + @Override @SuppressWarnings("unchecked") public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { return withCommsSession(session -> { @@ -321,6 +371,27 @@ public class DistributedMapCacheClientService extends AbstractControllerService }); } + @Override + public <K> Set<K> keySet(Deserializer<K> keyDeserializer) throws IOException { + return withCommsSession(session -> { + validateProtocolVersion(session, 3); + + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("keySet"); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final int setSize = dis.readInt(); + HashSet<K> resultSet = new HashSet<>(setSize); + for (int i=0; i<setSize; i++) { + final byte[] responseBuffer = readLengthDelimitedResponse(dis); + resultSet.add(keyDeserializer.deserialize(responseBuffer)); + } + return resultSet; + }); + } + private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException { final int responseLength = dis.readInt(); final byte[] responseBuffer = new byte[responseLength]; http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java index 34a0a7c..c35900e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.distributed.cache.client; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -40,8 +42,6 @@ import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService.ClientAuth; -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.stream.io.DataOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java index 18ca571..3089eeb 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.distributed.cache.client; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -25,8 +27,6 @@ import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java index 7545bef..d157161 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.distributed.cache.client; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -25,8 +27,6 @@ import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.remote.io.InterruptableInputStream; import org.apache.nifi.remote.io.InterruptableOutputStream; import org.apache.nifi.remote.io.socket.SocketChannelInputStream; http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java index 3df2f09..3f2e26a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java @@ -38,6 +38,7 @@ public class ProtocolHandshake { * If the server doesn't support requested protocol version, HandshakeException will be thrown.</p> * * <p>DistributedMapCache version histories:<ul> + * <li>3: Added subMap, keySet, removeAndGet, removeByPatternAndGet methods.</li> * <li>2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.</li> * <li>1: Initial version.</li> * </ul></p> http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java index 3dd224b..3e6e7a9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java @@ -17,6 +17,7 @@ package org.apache.nifi.distributed.cache.server; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -29,7 +30,6 @@ import org.apache.nifi.distributed.cache.server.set.PersistentSetCache; import org.apache.nifi.distributed.cache.server.set.SetCache; import org.apache.nifi.distributed.cache.server.set.SetCacheResult; import org.apache.nifi.distributed.cache.server.set.SimpleSetCache; -import org.apache.nifi.stream.io.DataOutputStream; public class SetCacheServer extends AbstractCacheServer { http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java index bbffbf9..e007ff0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Set; public interface MapCache { @@ -41,5 +42,7 @@ public interface MapCache { MapPutResult replace(MapCacheRecord record) throws IOException; + Set<ByteBuffer> keySet() throws IOException; + void shutdown() throws IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index a0a01c1..57af28e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Set; import javax.net.ssl.SSLContext; @@ -144,12 +145,47 @@ public class MapCacheServer extends AbstractCacheServer { dos.writeBoolean(removed); break; } + case "removeAndGet": { + final byte[] key = readValue(dis); + final ByteBuffer removed = cache.remove(ByteBuffer.wrap(key)); + if (removed == null) { + // there was no value removed + dos.writeInt(0); + } else { + // reply with the value that was removed + final byte[] byteArray = removed.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + break; + } case "removeByPattern": { final String pattern = dis.readUTF(); final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern); dos.writeLong(removed == null ? 0 : removed.size()); break; } + case "removeByPatternAndGet": { + final String pattern = dis.readUTF(); + final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern); + if (removed == null || removed.size() == 0) { + dos.writeLong(0); + } else { + // write the map size + dos.writeInt(removed.size()); + for (Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) { + // write map entry key + final byte[] key = entry.getKey().array(); + dos.writeInt(key.length); + dos.write(key); + // write map entry value + final byte[] value = entry.getValue().array(); + dos.writeInt(value.length); + dos.write(value); + } + } + break; + } case "fetch": { final byte[] key = readValue(dis); final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key)); @@ -175,6 +211,18 @@ public class MapCacheServer extends AbstractCacheServer { dos.writeBoolean(result.isSuccessful()); break; } + case "keySet": { + final Set<ByteBuffer> result = cache.keySet(); + // write the set size + dos.writeInt(result.size()); + // write each key in the set + for (ByteBuffer bb : result) { + final byte[] byteArray = bb.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + break; + } default: { throw new IOException("Illegal Request"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 6bf6e5a..c1eebd6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,6 +172,11 @@ public class PersistentMapCache implements MapCache { } @Override + public Set<ByteBuffer> keySet() throws IOException { + return wrapped.keySet(); + } + + @Override public void shutdown() throws IOException { wali.shutdown(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index df78332..8571432 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.Lock; @@ -274,6 +275,16 @@ public class SimpleMapCache implements MapCache { } @Override + public Set<ByteBuffer> keySet() throws IOException { + readLock.lock(); + try { + return cache.keySet(); + } finally { + readLock.unlock(); + } + } + + @Override public void shutdown() throws IOException { } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java index 419a471..e2a74d4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SystemUtils; @@ -274,6 +275,13 @@ public class TestServerAndClient { assertTrue(contains); assertTrue(contains2); + final Deserializer<String> deserializer = new StringDeserializer(); + final Set<String> keys = client.keySet(deserializer); + assertEquals(3, keys.size()); + assertTrue(keys.contains("test")); + assertTrue(keys.contains("test2")); + assertTrue(keys.contains("test3")); + final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer); assertFalse(addedAgain); @@ -307,6 +315,19 @@ public class TestServerAndClient { assertFalse(client.containsKey("test.2", serializer)); assertTrue(client.containsKey("test3", serializer)); + // test removeByPatternAndGet + client.put("test.1", "1", serializer, serializer); + client.put("test.2", "2", serializer, serializer); + Map<String,String> removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer); + assertEquals(2, removed.size()); + assertTrue(removed.containsKey("test.1")); + assertTrue(removed.containsKey("test.2")); + assertFalse(client.containsKey("test.1", serializer)); + assertFalse(client.containsKey("test.2", serializer)); + assertTrue(client.containsKey("test3", serializer)); + removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer); + assertEquals(0, removed.size()); + newServer.shutdownServer(); client.close(); } @@ -437,6 +458,16 @@ public class TestServerAndClient { assertTrue(removed); LOGGER.debug("end remove"); + client.put("testKey", "testValue", keySerializer, valueSerializer); + assertTrue(client.containsKey("testKey", keySerializer)); + String removedValue = client.removeAndGet("testKey", keySerializer, deserializer); + assertEquals("testValue", removedValue); + removedValue = client.removeAndGet("testKey", keySerializer, deserializer); + assertNull(removedValue); + + final Set<String> keys = client.keySet(deserializer); + assertEquals(0, keys.size()); + // Test removeByPattern, the first two should be removed and the last should remain client.put("test.1", "1", keySerializer, keySerializer); client.put("test.2", "2", keySerializer, keySerializer); @@ -687,6 +718,23 @@ public class TestServerAndClient { } catch (UnsupportedOperationException e) { } + try { + Set<String> keys = client.keySet(stringDeserializer); + fail("Version 3 operations should NOT work."); + } catch (UnsupportedOperationException e) { + } + + try { + String removed = client.removeAndGet("v.*", stringSerializer, stringDeserializer); + fail("Version 3 operations should NOT work."); + } catch (UnsupportedOperationException e) { + } + + try { + Map<String, String> removed = client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer); + fail("Version 3 operations should NOT work."); + } catch (UnsupportedOperationException e) { + } client.close(); server.shutdownServer(); }
