NIFI-271
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9a3b6bed Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9a3b6bed Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9a3b6bed Branch: refs/heads/NIFI-292 Commit: 9a3b6bed62e9962ff97f2c76055cd4d49705ff89 Parents: 6a70645 Author: joewitt <[email protected]> Authored: Mon Apr 27 13:43:35 2015 -0400 Committer: joewitt <[email protected]> Committed: Mon Apr 27 13:43:35 2015 -0400 ---------------------------------------------------------------------- .../distributed/cache/client/CommsSession.java | 16 +-- .../DistributedMapCacheClientService.java | 7 +- .../DistributedSetCacheClientService.java | 6 +- .../cache/client/SSLCommsSession.java | 25 ++-- .../cache/client/StandardCommsSession.java | 1 + .../additionalDetails.html | 60 +++++----- .../cache/server/AbstractCacheServer.java | 25 ++-- .../distributed/cache/server/CacheRecord.java | 12 +- .../distributed/cache/server/CacheServer.java | 3 +- .../cache/server/DistributedCacheServer.java | 3 +- .../cache/server/DistributedSetCacheServer.java | 13 ++- .../cache/server/EvictionPolicy.java | 24 ++-- .../cache/server/SetCacheServer.java | 25 ++-- .../server/map/DistributedMapCacheServer.java | 12 +- .../distributed/cache/server/map/MapCache.java | 4 + .../cache/server/map/MapCacheRecord.java | 19 ++-- .../cache/server/map/MapCacheServer.java | 113 ++++++++++--------- .../cache/server/map/MapPutResult.java | 5 +- .../cache/server/map/PersistentMapCache.java | 51 ++++----- .../cache/server/map/SimpleMapCache.java | 47 ++++---- .../cache/server/set/PersistentSetCache.java | 57 +++++----- .../distributed/cache/server/set/SetCache.java | 5 +- .../cache/server/set/SetCacheRecord.java | 15 +-- .../cache/server/set/SetCacheResult.java | 11 +- .../cache/server/set/SimpleSetCache.java | 41 +++---- .../additionalDetails.html | 62 +++++----- .../cache/server/TestServerAndClient.java | 9 +- .../nifi-http-context-map-api/pom.xml | 34 +++--- .../org/apache/nifi/http/HttpContextMap.java | 45 ++++---- .../nifi-http-context-map/pom.xml | 20 ++-- .../nifi/http/StandardHttpContextMap.java | 83 +++++++------- .../index.html | 36 +++--- .../nifi/ssl/StandardSSLContextService.java | 3 +- .../apache/nifi/ssl/SSLContextServiceTest.java | 4 +- 34 files changed, 461 insertions(+), 435 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java index f838c2f..c035485 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java @@ -27,20 +27,20 @@ import javax.net.ssl.SSLContext; public interface CommsSession extends Closeable { void setTimeout(final long value, final TimeUnit timeUnit); - + InputStream getInputStream() throws IOException; - + OutputStream getOutputStream() throws IOException; - + boolean isClosed(); - + void interrupt(); - + String getHostname(); - + int getPort(); - + long getTimeout(TimeUnit timeUnit); - + SSLContext getSSLContext(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 92bda8f..51138b9 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -42,7 +42,7 @@ import org.apache.nifi.stream.io.DataOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SeeAlso(classNames={"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " + "between nodes in a NiFi cluster") public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { @@ -65,14 +65,14 @@ public class DistributedMapCacheClientService extends AbstractControllerService public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("If specified, indicates the SSL Context Service that is used to communicate with the " - + "remote server. If not specified, communications will not be encrypted") + + "remote server. If not specified, communications will not be encrypted") .required(false) .identifiesControllerService(SSLContextService.class) .build(); public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() .name("Communications Timeout") .description("Specifies how long to wait when communicating with the remote server before determining that " - + "there is a communications failure if data cannot be sent or received") + + "there is a communications failure if data cannot be sent or received") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .defaultValue("30 secs") @@ -299,6 +299,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService } private static interface CommsAction<T> { + T execute(CommsSession commsSession) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java index 2de4ccb..63d59ca 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -42,7 +42,7 @@ import org.apache.nifi.stream.io.DataOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SeeAlso(classNames={"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @CapabilityDescription("Provides the ability to communicate with a DistributedSetCacheServer. This can be used in order to share a Set " + "between nodes in a NiFi cluster") public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient { @@ -65,14 +65,14 @@ public class DistributedSetCacheClientService extends AbstractControllerService public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("If specified, indicates the SSL Context Service that is used to communicate with the " - + "remote server. If not specified, communications will not be encrypted") + + "remote server. If not specified, communications will not be encrypted") .required(false) .identifiesControllerService(SSLContextService.class) .build(); public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() .name("Communications Timeout") .description("Specifices how long to wait when communicating with the remote server before determining " - + "that there is a communications failure if data cannot be sent or received") + + "that there is a communications failure if data cannot be sent or received") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .defaultValue("30 secs") http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java index 9b4b656..3d400bb 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java @@ -30,36 +30,37 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; public class SSLCommsSession implements CommsSession { + private final SSLSocketChannel sslSocketChannel; private final SSLContext sslContext; private final String hostname; private final int port; - + private final SSLSocketChannelInputStream in; private final BufferedInputStream bufferedIn; - + private final SSLSocketChannelOutputStream out; private final BufferedOutputStream bufferedOut; - public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { + public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true); - + in = new SSLSocketChannelInputStream(sslSocketChannel); bufferedIn = new BufferedInputStream(in); - + out = new SSLSocketChannelOutputStream(sslSocketChannel); bufferedOut = new BufferedOutputStream(out); - + this.sslContext = sslContext; this.hostname = hostname; this.port = port; } - + @Override public void interrupt() { sslSocketChannel.interrupt(); } - + @Override public void close() throws IOException { sslSocketChannel.close(); @@ -84,23 +85,25 @@ public class SSLCommsSession implements CommsSession { public boolean isClosed() { return sslSocketChannel.isClosed(); } - + @Override public String getHostname() { return hostname; } - + @Override public int getPort() { return port; } + @Override public SSLContext getSSLContext() { return sslContext; } + @Override public long getTimeout(final TimeUnit timeUnit) { return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS); } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java index 1f1ff7e..b2a5c1d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java @@ -33,6 +33,7 @@ import org.apache.nifi.remote.io.socket.SocketChannelInputStream; import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; public class StandardCommsSession implements CommsSession { + private final SocketChannel socketChannel; private final String hostname; private final int port; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html index 4cde8c6..1568635 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html @@ -1,35 +1,35 @@ <!DOCTYPE html> <html lang="en"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<head> -<meta charset="utf-8" /> -<title>Distributed Map Cache Client Service</title> -<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> -</head> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>Distributed Map Cache Client Service</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> -<body> - <p> - Below is an example of how to create a client connection to your distributed map cache server. - Note that the identifier in this example is <code>cache-client</code>. If you are using this template - to create your own MapCacheClient service, replace the values in this template with values that are - suitable for your system. Possible options for <code>Server Hostname</code>, <code>Server Port</code>, - <code>Communications Timeout</code>, and <span style="font-style: italic;">SSL Context Service</span>. - </p> + <body> + <p> + Below is an example of how to create a client connection to your distributed map cache server. + Note that the identifier in this example is <code>cache-client</code>. If you are using this template + to create your own MapCacheClient service, replace the values in this template with values that are + suitable for your system. Possible options for <code>Server Hostname</code>, <code>Server Port</code>, + <code>Communications Timeout</code>, and <span style="font-style: italic;">SSL Context Service</span>. + </p> - <pre> + <pre> <?xml version="1.0" encoding="UTF-8" ?> <services> <service> @@ -40,6 +40,6 @@ <property name="Communications Timeout">30 secs</property> </service> </services> - </pre> -</body> + </pre> + </body> </html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java index a6a2458..10f53b2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java @@ -51,7 +51,8 @@ public abstract class AbstractCacheServer implements CacheServer { private final int port; private final SSLContext sslContext; protected volatile boolean stopped = false; - private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();; + private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>(); + ; private volatile ServerSocketChannel serverSocketChannel; @@ -75,7 +76,7 @@ public abstract class AbstractCacheServer implements CacheServer { final SocketChannel socketChannel; try { socketChannel = serverSocketChannel.accept(); - logger.debug("Connected to {}", new Object[] { socketChannel }); + logger.debug("Connected to {}", new Object[]{socketChannel}); } catch (final IOException e) { if (!stopped) { logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString()); @@ -104,7 +105,7 @@ public abstract class AbstractCacheServer implements CacheServer { rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel); } } catch (IOException e) { - logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e); + logger.error("Cannot create input and/or output streams for {}", new Object[]{identifier}, e); if (logger.isDebugEnabled()) { logger.error("", e); } @@ -112,7 +113,7 @@ public abstract class AbstractCacheServer implements CacheServer { socketChannel.close(); } catch (IOException swallow) { } - + return; } try (final InputStream in = new BufferedInputStream(rawInputStream); @@ -127,12 +128,12 @@ public abstract class AbstractCacheServer implements CacheServer { continueComms = listen(in, out, versionNegotiator.getVersion()); } // client has issued 'close' - logger.debug("Client issued close on {}", new Object[] { socketChannel }); + logger.debug("Client issued close on {}", new Object[]{socketChannel}); } catch (final SocketTimeoutException e) { logger.debug("30 sec timeout reached", e); } catch (final IOException | HandshakeException e) { if (!stopped) { - logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() }); + logger.error("{} unable to communicate with remote peer {} due to {}", new Object[]{this, peer, e.toString()}); if (logger.isDebugEnabled()) { logger.error("", e); } @@ -161,7 +162,7 @@ public abstract class AbstractCacheServer implements CacheServer { @Override public void stop() throws IOException { stopped = true; - logger.info("Stopping CacheServer {}", new Object[] { this.identifier }); + logger.info("Stopping CacheServer {}", new Object[]{this.identifier}); if (serverSocketChannel != null && serverSocketChannel.isOpen()) { serverSocketChannel.close(); @@ -188,12 +189,12 @@ public abstract class AbstractCacheServer implements CacheServer { /** * Listens for incoming data and communicates with remote peer - * - * @param in - * @param out - * @param version + * + * @param in in + * @param out out + * @param version version * @return <code>true</code> if communications should continue, <code>false</code> otherwise - * @throws IOException + * @throws IOException ex */ protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java index 71ac56d..d7604cd 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java @@ -22,26 +22,26 @@ import java.util.concurrent.atomic.AtomicLong; public class CacheRecord { private static final AtomicLong idGenerator = new AtomicLong(0L); - + private final long id; private final long entryDate; private volatile long lastHitDate; private final AtomicInteger hitCount = new AtomicInteger(0); - + public CacheRecord() { entryDate = System.currentTimeMillis(); lastHitDate = entryDate; id = idGenerator.getAndIncrement(); } - + public long getEntryDate() { return entryDate; } - + public long getLastHitDate() { return lastHitDate; } - + public int getHitCount() { return hitCount.get(); } @@ -50,7 +50,7 @@ public class CacheRecord { hitCount.getAndIncrement(); lastHitDate = System.currentTimeMillis(); } - + public long getId() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java index 2c85cd8..fab8f13 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java @@ -21,6 +21,7 @@ import java.io.IOException; public interface CacheServer { void start() throws IOException; + void stop() throws IOException; - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java index f2e848f..5907f50 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java @@ -29,6 +29,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; public abstract class DistributedCacheServer extends AbstractControllerService { + public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used"; public static final String EVICTION_STRATEGY_LRU = "Least Recently Used"; public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; @@ -43,7 +44,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService { public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("If specified, this service will be used to create an SSL Context that will be used " - + "to secure communications; if not specified, communications will not be secure") + + "to secure communications; if not specified, communications will not be secure") .required(false) .identifiesControllerService(SSLContextService.class) .build(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java index 70e86c4..799baa3 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java @@ -25,6 +25,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService.ClientAuth; + @Tags({"distributed", "set", "distinct", "cache", "server"}) @CapabilityDescription("Provides a set (collection of unique values) cache that can be accessed over a socket. " + "Interaction with this service is typically accomplished via a DistributedSetCacheClient service.") @@ -37,14 +38,14 @@ public class DistributedSetCacheServer extends DistributedCacheServer { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); - + final SSLContext sslContext; - if ( sslContextService == null ) { + if (sslContextService == null) { sslContext = null; } else { sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); } - + final EvictionPolicy evictionPolicy; switch (evictionPolicyName) { case EVICTION_STRATEGY_FIFO: @@ -59,14 +60,14 @@ public class DistributedSetCacheServer extends DistributedCacheServer { default: throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); } - + try { final File persistenceDir = persistencePath == null ? null : new File(persistencePath); - + return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); } catch (final Exception e) { throw new RuntimeException(e); } } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java index 60bd2c1..e6d577d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java @@ -19,37 +19,40 @@ package org.apache.nifi.distributed.cache.server; import java.util.Comparator; public enum EvictionPolicy { + LFU(new LFUComparator()), LRU(new LRUComparator()), FIFO(new FIFOComparator()); - + private final Comparator<CacheRecord> comparator; - + private EvictionPolicy(final Comparator<CacheRecord> comparator) { this.comparator = comparator; } - + public Comparator<CacheRecord> getComparator() { return comparator; } - + public static class LFUComparator implements Comparator<CacheRecord> { + @Override public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { + if (o1.equals(o2)) { return 0; } - + final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount()); final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison; return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); } } - + public static class LRUComparator implements Comparator<CacheRecord> { + @Override public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { + if (o1.equals(o2)) { return 0; } @@ -57,11 +60,12 @@ public enum EvictionPolicy { return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison); } } - + public static class FIFOComparator implements Comparator<CacheRecord> { + @Override public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { + if (o1.equals(o2)) { return 0; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java index d0abe5c..3dd224b 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java @@ -67,17 +67,17 @@ public class SetCacheServer extends AbstractCacheServer { final SetCacheResult response; switch (action) { - case "addIfAbsent": - response = cache.addIfAbsent(valueBuffer); - break; - case "contains": - response = cache.contains(valueBuffer); - break; - case "remove": - response = cache.remove(valueBuffer); - break; - default: - throw new IOException("IllegalRequest"); + case "addIfAbsent": + response = cache.addIfAbsent(valueBuffer); + break; + case "contains": + response = cache.contains(valueBuffer); + break; + case "remove": + response = cache.remove(valueBuffer); + break; + default: + throw new IOException("IllegalRequest"); } dos.writeBoolean(response.getResult()); @@ -97,8 +97,9 @@ public class SetCacheServer extends AbstractCacheServer { @Override protected void finalize() throws Throwable { - if (!stopped) + if (!stopped) { stop(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java index 0594dd4..dce7ccd 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java @@ -33,7 +33,7 @@ import org.apache.nifi.ssl.SSLContextService.ClientAuth; @Tags({"distributed", "cluster", "map", "cache", "server", "key/value"}) @CapabilityDescription("Provides a map (key/value) cache that can be accessed over a socket. Interaction with this service" + " is typically accomplished via a DistributedMapCacheClient service.") -@SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.ssl.StandardSSLContextService"}) +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.ssl.StandardSSLContextService"}) public class DistributedMapCacheServer extends DistributedCacheServer { @Override @@ -43,14 +43,14 @@ public class DistributedMapCacheServer extends DistributedCacheServer { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); - + final SSLContext sslContext; - if ( sslContextService == null ) { + if (sslContextService == null) { sslContext = null; } else { sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); } - + final EvictionPolicy evictionPolicy; switch (evictionPolicyName) { case EVICTION_STRATEGY_FIFO: @@ -65,10 +65,10 @@ public class DistributedMapCacheServer extends DistributedCacheServer { default: throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); } - + try { final File persistenceDir = persistencePath == null ? null : new File(persistencePath); - + return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); } catch (final Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java index 534cb0b..fad0adb 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -22,8 +22,12 @@ import java.nio.ByteBuffer; public interface MapCache { MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException; + boolean containsKey(ByteBuffer key) throws IOException; + ByteBuffer get(ByteBuffer key) throws IOException; + ByteBuffer remove(ByteBuffer key) throws IOException; + void shutdown() throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java index b0ab0c4..ff032b1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java @@ -21,38 +21,39 @@ import java.nio.ByteBuffer; import org.apache.nifi.distributed.cache.server.CacheRecord; public class MapCacheRecord extends CacheRecord { + private final ByteBuffer key; private final ByteBuffer value; - + public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) { this.key = key; this.value = value; } - + public ByteBuffer getKey() { return key; } - + public ByteBuffer getValue() { return value; } - + @Override public int hashCode() { return 2938476 + key.hashCode() * value.hashCode(); } - + @Override public boolean equals(final Object obj) { - if ( obj == this ) { + if (obj == this) { return true; } - - if ( obj instanceof MapCacheRecord ) { + + if (obj instanceof MapCacheRecord) { final MapCacheRecord that = ((MapCacheRecord) obj); return key.equals(that.key) && value.equals(that.value); } - + return false; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index e4a600e..943d6aa 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -55,63 +55,63 @@ public class MapCacheServer extends AbstractCacheServer { final String action = dis.readUTF(); try { switch (action) { - case "close": { - return false; - } - case "putIfAbsent": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - dos.writeBoolean(putResult.isSuccessful()); - break; - } - case "containsKey": { - final byte[] key = readValue(dis); - final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); - dos.writeBoolean(contains); - break; - } - case "getAndPutIfAbsent": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - - final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - if (putResult.isSuccessful()) { - // Put was successful. There was no old value to get. - dos.writeInt(0); - } else { - // we didn't put. Write back the previous value - final byte[] byteArray = putResult.getExistingValue().array(); - dos.writeInt(byteArray.length); - dos.write(byteArray); + case "close": { + return false; } - - break; - } - case "get": { - final byte[] key = readValue(dis); - final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); - if (existingValue == null) { - // there was no existing value; we did a "put". - dos.writeInt(0); - } else { - // a value already existed. we did not update the map - final byte[] byteArray = existingValue.array(); - dos.writeInt(byteArray.length); - dos.write(byteArray); + case "putIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + dos.writeBoolean(putResult.isSuccessful()); + break; + } + case "containsKey": { + final byte[] key = readValue(dis); + final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); + dos.writeBoolean(contains); + break; + } + case "getAndPutIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + if (putResult.isSuccessful()) { + // Put was successful. There was no old value to get. + dos.writeInt(0); + } else { + // we didn't put. Write back the previous value + final byte[] byteArray = putResult.getExistingValue().array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + + break; + } + case "get": { + final byte[] key = readValue(dis); + final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); + if (existingValue == null) { + // there was no existing value; we did a "put". + dos.writeInt(0); + } else { + // a value already existed. we did not update the map + final byte[] byteArray = existingValue.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + + break; + } + case "remove": { + final byte[] key = readValue(dis); + final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; + dos.writeBoolean(removed); + break; + } + default: { + throw new IOException("Illegal Request"); } - - break; - } - case "remove": { - final byte[] key = readValue(dis); - final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; - dos.writeBoolean(removed); - break; - } - default: { - throw new IOException("Illegal Request"); - } } } finally { dos.flush(); @@ -131,8 +131,9 @@ public class MapCacheServer extends AbstractCacheServer { @Override protected void finalize() throws Throwable { - if (!stopped) + if (!stopped) { stop(); + } } private byte[] readValue(final DataInputStream dis) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java index 29695eb..d0055f3 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java @@ -19,11 +19,12 @@ package org.apache.nifi.distributed.cache.server.map; import java.nio.ByteBuffer; public class MapPutResult { + private final boolean successful; private final ByteBuffer key, value; private final ByteBuffer existingValue; private final ByteBuffer evictedKey, evictedValue; - + public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) { this.successful = successful; this.key = key; @@ -44,7 +45,7 @@ public class MapPutResult { public ByteBuffer getValue() { return value; } - + public ByteBuffer getExistingValue() { return existingValue; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 77fb77d..e821fbf 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -38,9 +38,9 @@ public class PersistentMapCache implements MapCache { private final MapCache wrapped; private final WriteAheadRepository<MapWaliRecord> wali; - + private final AtomicLong modifications = new AtomicLong(0L); - + public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException { wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); wrapped = cacheToWrap; @@ -48,8 +48,8 @@ public class PersistentMapCache implements MapCache { synchronized void restore() throws IOException { final Collection<MapWaliRecord> recovered = wali.recoverRecords(); - for ( final MapWaliRecord record : recovered ) { - if ( record.getUpdateType() == UpdateType.CREATE ) { + for (final MapWaliRecord record : recovered) { + if (record.getUpdateType() == UpdateType.CREATE) { wrapped.putIfAbsent(record.getKey(), record.getValue()); } } @@ -58,24 +58,24 @@ public class PersistentMapCache implements MapCache { @Override public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException { final MapPutResult putResult = wrapped.putIfAbsent(key, value); - if ( putResult.isSuccessful() ) { + if (putResult.isSuccessful()) { // The put was successful. final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); final List<MapWaliRecord> records = new ArrayList<>(); records.add(record); - if ( putResult.getEvictedKey() != null ) { + if (putResult.getEvictedKey() != null) { records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); } - + wali.update(Collections.singletonList(record), false); - + final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 100000 == 0 ) { + if (modCount > 0 && modCount % 100000 == 0) { wali.checkpoint(); } } - + return putResult; } @@ -92,65 +92,64 @@ public class PersistentMapCache implements MapCache { @Override public ByteBuffer remove(ByteBuffer key) throws IOException { final ByteBuffer removeResult = wrapped.remove(key); - if ( removeResult != null ) { + if (removeResult != null) { final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult); final List<MapWaliRecord> records = new ArrayList<>(1); records.add(record); wali.update(records, false); - + final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { + if (modCount > 0 && modCount % 1000 == 0) { wali.checkpoint(); } } return removeResult; } - @Override public void shutdown() throws IOException { wali.shutdown(); } - private static class MapWaliRecord { + private final UpdateType updateType; private final ByteBuffer key; private final ByteBuffer value; - + public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) { this.updateType = updateType; this.key = key; this.value = value; } - + public UpdateType getUpdateType() { return updateType; } - + public ByteBuffer getKey() { return key; } - + public ByteBuffer getValue() { return value; } } - + private static class Serde implements SerDe<MapWaliRecord> { @Override public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException { final UpdateType updateType = newRecordState.getUpdateType(); - if ( updateType == UpdateType.DELETE ) { + if (updateType == UpdateType.DELETE) { out.write(0); } else { out.write(1); } - + final byte[] key = newRecordState.getKey().array(); final byte[] value = newRecordState.getValue().array(); - + out.writeInt(key.length); out.write(key); out.writeInt(value.length); @@ -165,12 +164,12 @@ public class PersistentMapCache implements MapCache { @Override public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException { final int updateTypeValue = in.read(); - if ( updateTypeValue < 0 ) { + if (updateTypeValue < 0) { throw new EOFException(); } final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE); - + final int keySize = in.readInt(); final byte[] key = new byte[keySize]; in.readFully(key); @@ -207,4 +206,4 @@ public class PersistentMapCache implements MapCache { return 1; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index 10139f1..9e8bbd1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -33,46 +33,47 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleMapCache implements MapCache { + private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class); private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>(); private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap; - + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); - + private final String serviceIdentifier; - + private final int maxSize; - + public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { // need to change to ConcurrentMap as this is modified when only the readLock is held inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator()); this.serviceIdentifier = serviceIdentifier; this.maxSize = maxSize; } - + @Override public String toString() { return "SimpleSetCache[service id=" + serviceIdentifier + "]"; } - // don't need synchronized because this method is only called when the writeLock is held, and all + // don't need synchronized because this method is only called when the writeLock is held, and all // public methods obtain either the read or write lock private MapCacheRecord evict() { - if ( cache.size() < maxSize ) { + if (cache.size() < maxSize) { return null; } - + final MapCacheRecord recordToEvict = inverseCacheMap.firstKey(); final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); cache.remove(valueToEvict); - - if ( logger.isDebugEnabled() ) { + + if (logger.isDebugEnabled()) { logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); } - + return recordToEvict; } @@ -81,44 +82,44 @@ public class SimpleMapCache implements MapCache { writeLock.lock(); try { final MapCacheRecord record = cache.get(key); - if ( record == null ) { + if (record == null) { // Record is null. We will add. final MapCacheRecord evicted = evict(); final MapCacheRecord newRecord = new MapCacheRecord(key, value); cache.put(key, newRecord); inverseCacheMap.put(newRecord, key); - - if ( evicted == null ) { + + if (evicted == null) { return new MapPutResult(true, key, value, null, null, null); } else { return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue()); } } - + // Record is not null. Increment hit count and return result indicating that record was not added. inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, key); - + return new MapPutResult(false, key, value, record.getValue(), null, null); } finally { writeLock.unlock(); } } - + @Override public boolean containsKey(final ByteBuffer key) { readLock.lock(); try { final MapCacheRecord record = cache.get(key); - if ( record == null ) { + if (record == null) { return false; } - + inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, key); - + return true; } finally { readLock.unlock(); @@ -130,14 +131,14 @@ public class SimpleMapCache implements MapCache { readLock.lock(); try { final MapCacheRecord record = cache.get(key); - if ( record == null ) { + if (record == null) { return null; } - + inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, key); - + return record.getValue(); } finally { readLock.unlock(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java index 4d75fc0..c2c3a41 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java @@ -38,34 +38,34 @@ public class PersistentSetCache implements SetCache { private final SetCache wrapped; private final WriteAheadRepository<SetRecord> wali; - + private final AtomicLong modifications = new AtomicLong(0L); - + public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException { wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); wrapped = cacheToWrap; } - + public synchronized void restore() throws IOException { final Collection<SetRecord> recovered = wali.recoverRecords(); - for ( final SetRecord record : recovered ) { - if ( record.getUpdateType() == UpdateType.CREATE ) { + for (final SetRecord record : recovered) { + if (record.getUpdateType() == UpdateType.CREATE) { addIfAbsent(record.getBuffer()); } } } - + @Override public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException { final SetCacheResult removeResult = wrapped.remove(value); - if ( removeResult.getResult() ) { + if (removeResult.getResult()) { final SetRecord record = new SetRecord(UpdateType.DELETE, value); final List<SetRecord> records = new ArrayList<>(); records.add(record); wali.update(records, false); - + final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { + if (modCount > 0 && modCount % 1000 == 0) { wali.checkpoint(); } } @@ -76,24 +76,24 @@ public class PersistentSetCache implements SetCache { @Override public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException { final SetCacheResult addResult = wrapped.addIfAbsent(value); - if ( addResult.getResult() ) { + if (addResult.getResult()) { final SetRecord record = new SetRecord(UpdateType.CREATE, value); final List<SetRecord> records = new ArrayList<>(); records.add(record); - + final SetCacheRecord evictedRecord = addResult.getEvictedRecord(); - if ( evictedRecord != null ) { + if (evictedRecord != null) { records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue())); } - + wali.update(records, false); - + final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { + if (modCount > 0 && modCount % 1000 == 0) { wali.checkpoint(); } } - + return addResult; } @@ -101,45 +101,46 @@ public class PersistentSetCache implements SetCache { public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException { return wrapped.contains(value); } - + @Override public void shutdown() throws IOException { wali.shutdown(); } - + private static class SetRecord { + private final UpdateType updateType; private final ByteBuffer value; - + public SetRecord(final UpdateType updateType, final ByteBuffer value) { this.updateType = updateType; this.value = value; } - + public UpdateType getUpdateType() { return updateType; } - + public ByteBuffer getBuffer() { return value; } - + public byte[] getData() { return value.array(); } } - + private static class Serde implements SerDe<SetRecord> { @Override public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException { final UpdateType updateType = newRecordState.getUpdateType(); - if ( updateType == UpdateType.DELETE ) { + if (updateType == UpdateType.DELETE) { out.write(0); } else { out.write(1); } - + final byte[] data = newRecordState.getData(); out.writeInt(data.length); out.write(newRecordState.getData()); @@ -153,16 +154,16 @@ public class PersistentSetCache implements SetCache { @Override public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException { final int value = in.read(); - if ( value < 0 ) { + if (value < 0) { throw new EOFException(); } final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE); - + final int size = in.readInt(); final byte[] data = new byte[size]; in.readFully(data); - + return new SetRecord(updateType, ByteBuffer.wrap(data)); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java index bf6ae3e..dd37d0c 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java @@ -22,8 +22,11 @@ import java.nio.ByteBuffer; public interface SetCache { SetCacheResult remove(ByteBuffer value) throws IOException; + SetCacheResult addIfAbsent(ByteBuffer value) throws IOException; + SetCacheResult contains(ByteBuffer value) throws IOException; + void shutdown() throws IOException; - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java index 20b6fae..5a75775 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java @@ -21,33 +21,34 @@ import java.nio.ByteBuffer; import org.apache.nifi.distributed.cache.server.CacheRecord; public class SetCacheRecord extends CacheRecord { + private final ByteBuffer value; - + public SetCacheRecord(final ByteBuffer value) { this.value = value; } - + public ByteBuffer getValue() { return value; } - + @Override public int hashCode() { return value.hashCode(); } - + @Override public boolean equals(final Object obj) { - if ( this == obj ) { + if (this == obj) { return true; } - + if (obj instanceof SetCacheRecord) { return value.equals(((SetCacheRecord) obj).value); } return false; } - + @Override public String toString() { return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]"; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java index 732c4f0..7faceb6 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java @@ -16,27 +16,26 @@ */ package org.apache.nifi.distributed.cache.server.set; - - public class SetCacheResult { + private final boolean result; private final SetCacheRecord stats; private final SetCacheRecord evictedRecord; - + public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) { this.result = result; this.stats = stats; this.evictedRecord = evictedRecord; } - + public boolean getResult() { return result; } - + public SetCacheRecord getRecord() { return stats; } - + public SetCacheRecord getEvictedRecord() { return evictedRecord; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a3b6bed/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java index 77d6481..bf69ba7 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java @@ -30,41 +30,42 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleSetCache implements SetCache { + private static final Logger logger = LoggerFactory.getLogger(SimpleSetCache.class); - + private final Map<ByteBuffer, SetCacheRecord> cache = new HashMap<>(); private final SortedMap<SetCacheRecord, ByteBuffer> inverseCacheMap; - + private final String serviceIdentifier; - + private final int maxSize; - + public SimpleSetCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { inverseCacheMap = new TreeMap<>(evictionPolicy.getComparator()); this.serviceIdentifier = serviceIdentifier; this.maxSize = maxSize; } - + private synchronized SetCacheRecord evict() { - if ( cache.size() < maxSize ) { + if (cache.size() < maxSize) { return null; } - + final SetCacheRecord recordToEvict = inverseCacheMap.firstKey(); final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); cache.remove(valueToEvict); - - if ( logger.isDebugEnabled() ) { + + if (logger.isDebugEnabled()) { logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); } - + return recordToEvict; } - + @Override public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) { final SetCacheRecord record = cache.get(value); - if ( record == null ) { + if (record == null) { final SetCacheRecord evicted = evict(); final SetCacheRecord newRecord = new SetCacheRecord(value); cache.put(value, newRecord); @@ -75,42 +76,42 @@ public class SimpleSetCache implements SetCache { inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, value); - + return new SetCacheResult(false, record, null); } } - + @Override public synchronized SetCacheResult contains(final ByteBuffer value) { final SetCacheRecord record = cache.get(value); - if ( record == null ) { + if (record == null) { return new SetCacheResult(false, null, null); } else { // We have to remove the record and add it again in order to cause the Map to stay sorted inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, value); - + return new SetCacheResult(true, record, null); } } - + @Override public synchronized SetCacheResult remove(final ByteBuffer value) { final SetCacheRecord record = cache.remove(value); - if ( record == null ) { + if (record == null) { return new SetCacheResult(false, null, null); } else { inverseCacheMap.remove(record); return new SetCacheResult(true, record, null); } } - + @Override public String toString() { return "SimpleSetCache[service id=" + serviceIdentifier + "]"; } - + @Override public void shutdown() throws IOException { }
