This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d23bf2ce4f NIFI-13596 Renamed Distributed Cache Server and Client
Services
d23bf2ce4f is described below
commit d23bf2ce4fedcede87b7560bc4a552a814ef0771
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Oct 16 11:12:06 2024 -0500
NIFI-13596 Renamed Distributed Cache Server and Client Services
- Renamed DistributedMapCacheServer to MapCacheServer
- Renamed DistributedSetCacheServer to SetCacheServer
- Renamed DistributedMapCacheClientService to MapCacheClientService
- Renamed DistributedSetCacheClientService to SetCacheClientService
Signed-off-by: Pierre Villard <[email protected]>
This closes #9398.
---
.../util/list/TestAbstractListProcessor.java | 4 +-
.../processors/standard/TestDetectDuplicate.java | 32 ++++++------
...istributedCacheClient.java => CacheClient.java} | 17 +++----
.../client/CacheClientChannelPoolFactory.java | 4 +-
...ientService.java => MapCacheClientService.java} | 13 ++---
...apCacheClient.java => NettyMapCacheClient.java} | 4 +-
...etCacheClient.java => NettySetCacheClient.java} | 6 +--
...ientService.java => SetCacheClientService.java} | 13 ++---
.../org.apache.nifi.controller.ControllerService | 4 +-
...edCacheServer.java => AbstractCacheServer.java} | 2 +-
...utedSetCacheServer.java => SetCacheServer.java} | 2 +-
...utedMapCacheServer.java => MapCacheServer.java} | 8 +--
.../org.apache.nifi.controller.ControllerService | 4 +-
...cheTlsTest.java => MapCacheServiceTlsTest.java} | 22 ++++----
...tributedMapCacheTest.java => MapCacheTest.java} | 18 +++----
.../map/TestDistributedMapServerAndClient.java | 58 +++++++++++-----------
...Service.java => TestMapCacheClientService.java} | 14 +++---
...dSetCacheTest.java => SetCacheServiceTest.java} | 20 ++++----
.../set/TestDistributedSetServerAndClient.java | 46 ++++++++---------
.../TestDistributedMapCacheLookupService.java | 4 +-
20 files changed, 143 insertions(+), 152 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 68e8cb4be7..338f80dc56 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -217,7 +217,7 @@ public class TestAbstractListProcessor {
// Require a cache service.
runner.assertNotValid();
- final DistributedCache trackingCache = new DistributedCache();
+ final EphemeralMapCacheClientService trackingCache = new
EphemeralMapCacheClientService();
runner.addControllerService("tracking-cache", trackingCache);
runner.enableControllerService(trackingCache);
@@ -361,7 +361,7 @@ public class TestAbstractListProcessor {
String.format("Expected verification result to match pattern
[%s]. Actual explanation was: %s", expectedExplanationRegex,
result.getExplanation()));
}
- static class DistributedCache extends AbstractControllerService implements
DistributedMapCacheClient {
+ static class EphemeralMapCacheClientService extends
AbstractControllerService implements DistributedMapCacheClient {
private final Map<Object, Object> stored = new HashMap<>();
private int fetchCount = 0;
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index f6b2c412ba..d8e1d2224d 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@ -20,7 +20,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.MapCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
@@ -42,9 +42,9 @@ public class TestDetectDuplicate {
@Test
public void testDuplicate() throws InitializationException {
final TestRunner runner =
TestRunners.newTestRunner(DetectDuplicate.class);
- final DistributedMapCacheClientImpl client = createClient();
+ final EphemeralMapCacheClientService client = createClient();
final Map<String, String> clientProperties = new HashMap<>();
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
"localhost");
+ clientProperties.put(MapCacheClientService.HOSTNAME.getName(),
"localhost");
runner.addControllerService("client", client, clientProperties);
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE,
"client");
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original
flow file");
@@ -68,9 +68,9 @@ public class TestDetectDuplicate {
public void testDuplicateWithAgeOff() throws InitializationException,
InterruptedException {
final TestRunner runner =
TestRunners.newTestRunner(DetectDuplicate.class);
- final DistributedMapCacheClientImpl client = createClient();
+ final EphemeralMapCacheClientService client = createClient();
final Map<String, String> clientProperties = new HashMap<>();
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
"localhost");
+ clientProperties.put(MapCacheClientService.HOSTNAME.getName(),
"localhost");
runner.addControllerService("client", client, clientProperties);
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE,
"client");
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original
flow file");
@@ -92,9 +92,9 @@ public class TestDetectDuplicate {
runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
}
- private DistributedMapCacheClientImpl createClient() throws
InitializationException {
+ private EphemeralMapCacheClientService createClient() throws
InitializationException {
- final DistributedMapCacheClientImpl client = new
DistributedMapCacheClientImpl();
+ final EphemeralMapCacheClientService client = new
EphemeralMapCacheClientService();
final ComponentLog logger = new MockComponentLog("client", client);
final MockControllerServiceInitializationContext clientInitContext =
new MockControllerServiceInitializationContext(client, "client", logger, new
MockStateManager(client));
client.initialize(clientInitContext);
@@ -105,9 +105,9 @@ public class TestDetectDuplicate {
@Test
public void testDuplicateNoCache() throws InitializationException {
final TestRunner runner =
TestRunners.newTestRunner(DetectDuplicate.class);
- final DistributedMapCacheClientImpl client = createClient();
+ final EphemeralMapCacheClientService client = createClient();
final Map<String, String> clientProperties = new HashMap<>();
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
"localhost");
+ clientProperties.put(MapCacheClientService.HOSTNAME.getName(),
"localhost");
runner.addControllerService("client", client, clientProperties);
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE,
"client");
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original
flow file");
@@ -141,9 +141,9 @@ public class TestDetectDuplicate {
public void testDuplicateNoCacheWithAgeOff() throws
InitializationException, InterruptedException {
final TestRunner runner =
TestRunners.newTestRunner(DetectDuplicate.class);
- final DistributedMapCacheClientImpl client = createClient();
+ final EphemeralMapCacheClientService client = createClient();
final Map<String, String> clientProperties = new HashMap<>();
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
"localhost");
+ clientProperties.put(MapCacheClientService.HOSTNAME.getName(),
"localhost");
runner.addControllerService("client", client, clientProperties);
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE,
"client");
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original
flow file");
@@ -168,7 +168,7 @@ public class TestDetectDuplicate {
runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
}
- static final class DistributedMapCacheClientImpl extends
AbstractControllerService implements DistributedMapCacheClient {
+ static final class EphemeralMapCacheClientService extends
AbstractControllerService implements DistributedMapCacheClient {
boolean exists = false;
private Object cacheValue;
@@ -180,10 +180,10 @@ public class TestDetectDuplicate {
@Override
protected java.util.List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(DistributedMapCacheClientService.HOSTNAME);
- props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT);
- props.add(DistributedMapCacheClientService.PORT);
- props.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE);
+ props.add(MapCacheClientService.HOSTNAME);
+ props.add(MapCacheClientService.COMMUNICATIONS_TIMEOUT);
+ props.add(MapCacheClientService.PORT);
+ props.add(MapCacheClientService.SSL_CONTEXT_SERVICE);
return props;
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClient.java
similarity index 86%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClient.java
index 6a65e26ed1..c0372b462d 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClient.java
@@ -29,10 +29,9 @@ import org.apache.nifi.ssl.SSLContextService;
import java.io.IOException;
/**
- * Encapsulate operations which may be performed using a {@link
DistributedSetCacheClientService} or a
- * {@link DistributedMapCacheClientService}.
+ * Encapsulate operations which may be performed using a Cache Client Service
*/
-public class DistributedCacheClient {
+public class CacheClient {
private static final boolean DAEMON_THREAD_ENABLED = true;
@@ -54,12 +53,12 @@ public class DistributedCacheClient {
* @param factory creator of object used to broker the version
of the distributed cache protocol with the service
* @param identifier uniquely identifies this client
*/
- protected DistributedCacheClient(final String hostname,
- final int port,
- final int timeoutMillis,
- final SSLContextService sslContextService,
- final VersionNegotiatorFactory factory,
- final String identifier) {
+ protected CacheClient(final String hostname,
+ final int port,
+ final int timeoutMillis,
+ final SSLContextService sslContextService,
+ final VersionNegotiatorFactory factory,
+ final String identifier) {
final String poolName = String.format("%s[%s]",
getClass().getSimpleName(), identifier);
this.eventLoopGroup = new NioEventLoopGroup(new
DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED));
this.channelPool = new
CacheClientChannelPoolFactory().createChannelPool(
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
index 905de8dc52..c7a337f15e 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
@@ -33,9 +33,7 @@ import javax.net.ssl.SSLContext;
import java.time.Duration;
/**
- * Factory for construction of new {@link ChannelPool}, used by distributed
cache clients to invoke service
- * methods. Cache clients include the NiFi services {@link
DistributedSetCacheClientService}
- * and {@link DistributedMapCacheClientService}.
+ * Factory for construction of new {@link ChannelPool}, used by distributed
cache clients to invoke service methods.
*/
class CacheClientChannelPoolFactory {
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/MapCacheClientService.java
similarity index 95%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/MapCacheClientService.java
index f92d4634f5..d44c999508 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/MapCacheClientService.java
@@ -46,10 +46,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
@Tags({"distributed", "cache", "state", "map", "cluster"})
-@SeeAlso(classNames =
{"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.ssl.StandardSSLContextService"})
-@CapabilityDescription("Provides the ability to communicate with a
DistributedMapCacheServer. This can be used in order to share a Map "
+@SeeAlso(classNames =
{"org.apache.nifi.distributed.cache.server.map.MapCacheServer"})
+@CapabilityDescription("Provides the ability to communicate with a
MapCacheServer. This can be used in order to share a Map "
+ "between nodes in a NiFi cluster")
-public class DistributedMapCacheClientService extends
AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
+public class MapCacheClientService extends AbstractControllerService
implements AtomicDistributedMapCacheClient<Long> {
private static final long DEFAULT_CACHE_REVISION = 0L;
@@ -82,10 +82,7 @@ public class DistributedMapCacheClientService extends
AbstractControllerService
.defaultValue("30 secs")
.build();
- /**
- * The implementation of the business logic for {@link
DistributedMapCacheClientService}.
- */
- private volatile NettyDistributedMapCacheClient cacheClient = null;
+ private volatile NettyMapCacheClient cacheClient = null;
/**
* Creator of object used to broker the version of the distributed cache
protocol with the service.
@@ -107,7 +104,7 @@ public class DistributedMapCacheClientService extends
AbstractControllerService
getLogger().debug("Enabling Map Cache Client Service [{}]",
context.getName());
this.versionNegotiatorFactory = new StandardVersionNegotiatorFactory(
ProtocolVersion.V3.value(), ProtocolVersion.V2.value(),
ProtocolVersion.V1.value());
- this.cacheClient = new NettyDistributedMapCacheClient(
+ this.cacheClient = new NettyMapCacheClient(
context.getProperty(HOSTNAME).getValue(),
context.getProperty(PORT).asInteger(),
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyMapCacheClient.java
similarity index 99%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyMapCacheClient.java
index 694d241549..a0b21390ba 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyMapCacheClient.java
@@ -39,7 +39,7 @@ import java.util.Set;
* The implementation of the {@link DistributedMapCacheClient} using the netty
library to provide the remote
* communication services.
*/
-public class NettyDistributedMapCacheClient extends DistributedCacheClient {
+public class NettyMapCacheClient extends CacheClient {
private final ComponentLog log;
/**
@@ -54,7 +54,7 @@ public class NettyDistributedMapCacheClient extends
DistributedCacheClient {
* @param identifier uniquely identifies this client
* @param log Component Log from instantiating Services
*/
- public NettyDistributedMapCacheClient(
+ public NettyMapCacheClient(
final String hostname,
final int port,
final int timeoutMillis,
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedSetCacheClient.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettySetCacheClient.java
similarity index 95%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedSetCacheClient.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettySetCacheClient.java
index 9bcc64a6d4..86a8947715 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedSetCacheClient.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettySetCacheClient.java
@@ -26,10 +26,10 @@ import org.apache.nifi.ssl.SSLContextService;
import java.io.IOException;
/**
- * The implementation of the {@link DistributedCacheClient} using the netty
library to provide the remote
+ * The implementation of the {@link CacheClient} using the netty library to
provide the remote
* communication services.
*/
-public class NettyDistributedSetCacheClient extends DistributedCacheClient {
+public class NettySetCacheClient extends CacheClient {
/**
* Constructor.
@@ -42,7 +42,7 @@ public class NettyDistributedSetCacheClient extends
DistributedCacheClient {
* @param factory creator of object used to broker the version
of the distributed cache protocol with the service
* @param identifier uniquely identifies this client
*/
- public NettyDistributedSetCacheClient(
+ public NettySetCacheClient(
final String hostname,
final int port,
final int timeoutMillis,
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SetCacheClientService.java
similarity index 92%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SetCacheClientService.java
index 19392ba51a..c706a7642c 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SetCacheClientService.java
@@ -37,10 +37,10 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
@Tags({"distributed", "cache", "state", "set", "cluster"})
-@SeeAlso(classNames =
{"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer",
"org.apache.nifi.ssl.StandardSSLContextService"})
-@CapabilityDescription("Provides the ability to communicate with a
DistributedSetCacheServer. This can be used in order to share a Set "
+@SeeAlso(classNames =
{"org.apache.nifi.distributed.cache.server.SetCacheServer"})
+@CapabilityDescription("Provides the ability to communicate with a
SetCacheServer. This can be used in order to share a Set "
+ "between nodes in a NiFi cluster")
-public class DistributedSetCacheClientService extends
AbstractControllerService implements DistributedSetCacheClient {
+public class SetCacheClientService extends AbstractControllerService
implements DistributedSetCacheClient {
public static final PropertyDescriptor HOSTNAME = new
PropertyDescriptor.Builder()
.name("Server Hostname")
@@ -71,10 +71,7 @@ public class DistributedSetCacheClientService extends
AbstractControllerService
.defaultValue("30 secs")
.build();
- /**
- * The implementation of the business logic for {@link
DistributedSetCacheClientService}.
- */
- private volatile NettyDistributedSetCacheClient cacheClient = null;
+ private volatile NettySetCacheClient cacheClient = null;
/**
* Creator of object used to broker the version of the distributed cache
protocol with the service.
@@ -95,7 +92,7 @@ public class DistributedSetCacheClientService extends
AbstractControllerService
public void onEnabled(final ConfigurationContext context) {
getLogger().debug("Enabling Set Cache Client Service [{}]",
context.getName());
this.versionNegotiatorFactory = new
StandardVersionNegotiatorFactory(ProtocolVersion.V1.value());
- this.cacheClient = new NettyDistributedSetCacheClient(
+ this.cacheClient = new NettySetCacheClient(
context.getProperty(HOSTNAME).getValue(),
context.getProperty(PORT).asInteger(),
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index a91f7ee4f4..53b1aeb26f 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
-org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
\ No newline at end of file
+org.apache.nifi.distributed.cache.client.SetCacheClientService
+org.apache.nifi.distributed.cache.client.MapCacheClientService
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
similarity index 98%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
index 573d95454c..e674af8a4d 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -29,7 +29,7 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.RestrictedSSLContextService;
-public abstract class DistributedCacheServer extends AbstractControllerService
{
+public abstract class AbstractCacheServer extends AbstractControllerService {
public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
similarity index 97%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
index 201ef93455..8aa113e3cd 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
@@ -28,7 +28,7 @@ import org.apache.nifi.ssl.SSLContextService;
@Tags({"distributed", "set", "distinct", "cache", "server"})
@CapabilityDescription("Provides a set (collection of unique values) cache
that can be accessed over a socket. "
+ "Interaction with this service is typically accomplished via a
DistributedSetCacheClient service.")
-public class DistributedSetCacheServer extends DistributedCacheServer {
+public class SetCacheServer extends AbstractCacheServer {
@Override
protected CacheServer createCacheServer(final ConfigurationContext
context) {
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
similarity index 92%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index 8c3a83a485..a49f022409 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -24,16 +24,16 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.server.CacheServer;
-import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.ssl.SSLContextService;
@Tags({"distributed", "cluster", "map", "cache", "server", "key/value"})
@CapabilityDescription("Provides a map (key/value) cache that can be accessed
over a socket. Interaction with this service"
- + " is typically accomplished via a DistributedMapCacheClient
service.")
-@SeeAlso(classNames =
{"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService",
"org.apache.nifi.ssl.StandardSSLContextService"})
-public class DistributedMapCacheServer extends DistributedCacheServer {
+ + " is typically accomplished via a Map Cache Client Service.")
+@SeeAlso(classNames =
{"org.apache.nifi.distributed.cache.client.MapCacheClientService"})
+public class MapCacheServer extends AbstractCacheServer {
@Override
protected CacheServer createCacheServer(final ConfigurationContext
context) {
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 0509c7cca1..035a36c0a6 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.distributed.cache.server.DistributedSetCacheServer
-org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer
\ No newline at end of file
+org.apache.nifi.distributed.cache.server.SetCacheServer
+org.apache.nifi.distributed.cache.server.map.MapCacheServer
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheServiceTlsTest.java
similarity index 86%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheServiceTlsTest.java
index bcad46f581..ae9ee5a2e8 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheServiceTlsTest.java
@@ -18,7 +18,7 @@ package org.apache.nifi.distributed.cache.server.map;
import org.apache.commons.lang3.SerializationException;
import org.apache.nifi.distributed.cache.client.Deserializer;
-import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.MapCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processor.Processor;
@@ -51,12 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
-public class DistributedMapCacheTlsTest {
+public class MapCacheServiceTlsTest {
private static TestRunner runner = null;
private static SSLContextService sslContextService = null;
- private static DistributedMapCacheServer server = null;
- private static DistributedMapCacheClientService client = null;
+ private static MapCacheServer server = null;
+ private static MapCacheClientService client = null;
private static final Serializer<String> serializer = new
StringSerializer();
private static final Deserializer<String> deserializer = new
StringDeserializer();
@@ -67,18 +67,18 @@ public class DistributedMapCacheTlsTest {
runner.addControllerService(sslContextService.getIdentifier(),
sslContextService);
runner.enableControllerService(sslContextService);
- server = new DistributedMapCacheServer();
+ server = new MapCacheServer();
runner.addControllerService(server.getClass().getName(), server);
- runner.setProperty(server, DistributedMapCacheServer.PORT, "0");
- runner.setProperty(server,
DistributedMapCacheServer.SSL_CONTEXT_SERVICE,
sslContextService.getIdentifier());
+ runner.setProperty(server, MapCacheServer.PORT, "0");
+ runner.setProperty(server, MapCacheServer.SSL_CONTEXT_SERVICE,
sslContextService.getIdentifier());
runner.enableControllerService(server);
final int listeningPort = server.getPort();
- client = new DistributedMapCacheClientService();
+ client = new MapCacheClientService();
runner.addControllerService(client.getClass().getName(), client);
- runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME,
"localhost");
- runner.setProperty(client, DistributedMapCacheClientService.PORT,
String.valueOf(listeningPort));
- runner.setProperty(client,
DistributedMapCacheClientService.SSL_CONTEXT_SERVICE,
sslContextService.getIdentifier());
+ runner.setProperty(client, MapCacheClientService.HOSTNAME,
"localhost");
+ runner.setProperty(client, MapCacheClientService.PORT,
String.valueOf(listeningPort));
+ runner.setProperty(client, MapCacheClientService.SSL_CONTEXT_SERVICE,
sslContextService.getIdentifier());
runner.enableControllerService(client);
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheTest.java
similarity index 92%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheTest.java
index 8a0d5303d3..591e5707bc 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheTest.java
@@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.server.map;
import org.apache.commons.lang3.SerializationException;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.Deserializer;
-import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.MapCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processor.Processor;
@@ -45,11 +45,11 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(5)
-public class DistributedMapCacheTest {
+public class MapCacheTest {
private static TestRunner runner = null;
- private static DistributedMapCacheServer server = null;
- private static DistributedMapCacheClientService client = null;
+ private static MapCacheServer server = null;
+ private static MapCacheClientService client = null;
private static final Serializer<String> serializer = new
StringSerializer();
private static final Deserializer<String> deserializer = new
StringDeserializer();
@@ -57,16 +57,16 @@ public class DistributedMapCacheTest {
public static void startServices() throws Exception {
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- server = new DistributedMapCacheServer();
+ server = new MapCacheServer();
runner.addControllerService(server.getClass().getName(), server);
- runner.setProperty(server, DistributedMapCacheServer.PORT, "0");
+ runner.setProperty(server, MapCacheServer.PORT, "0");
runner.enableControllerService(server);
final int port = server.getPort();
- client = new DistributedMapCacheClientService();
+ client = new MapCacheClientService();
runner.addControllerService(client.getClass().getName(), client);
- runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME,
"localhost");
- runner.setProperty(client, DistributedMapCacheClientService.PORT,
String.valueOf(port));
+ runner.setProperty(client, MapCacheClientService.HOSTNAME,
"localhost");
+ runner.setProperty(client, MapCacheClientService.PORT,
String.valueOf(port));
runner.enableControllerService(client);
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
index 58c8506293..5e87d4e99d 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
@@ -22,12 +22,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.Deserializer;
-import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.MapCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
import org.apache.nifi.distributed.cache.server.CacheServer;
-import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
@@ -76,7 +76,7 @@ public class TestDistributedMapServerAndClient {
private TestRunner runner;
- private DistributedMapCacheServer server;
+ private MapCacheServer server;
@BeforeEach
public void setRunner() throws InitializationException, IOException {
@@ -86,10 +86,10 @@ public class TestDistributedMapServerAndClient {
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- server = new DistributedMapCacheServer();
+ server = new MapCacheServer();
runner.addControllerService("server", server);
- runner.setProperty(server, DistributedMapCacheServer.PORT, "0");
+ runner.setProperty(server, MapCacheServer.PORT, "0");
}
@AfterEach
@@ -101,11 +101,11 @@ public class TestDistributedMapServerAndClient {
public void testNonPersistentMapServerAndClient() throws
InitializationException, IOException {
runner.enableControllerService(server);
- DistributedMapCacheClientService client = new
DistributedMapCacheClientService();
+ MapCacheClientService client = new MapCacheClientService();
try {
runner.addControllerService("client", client);
- runner.setProperty(client,
DistributedMapCacheClientService.HOSTNAME, "localhost");
- runner.setProperty(client, DistributedMapCacheClientService.PORT,
String.valueOf(server.getPort()));
+ runner.setProperty(client, MapCacheClientService.HOSTNAME,
"localhost");
+ runner.setProperty(client, MapCacheClientService.PORT,
String.valueOf(server.getPort()));
runner.enableControllerService(client);
final Serializer<String> valueSerializer = new StringSerializer();
@@ -150,18 +150,18 @@ public class TestDistributedMapServerAndClient {
public void testOptimisticLock() throws Exception {
runner.enableControllerService(server);
- DistributedMapCacheClientService client1 = new
DistributedMapCacheClientService();
+ MapCacheClientService client1 = new MapCacheClientService();
MockControllerServiceInitializationContext clientInitContext1 = new
MockControllerServiceInitializationContext(client1, "client1");
client1.initialize(clientInitContext1);
- DistributedMapCacheClientService client2 = new
DistributedMapCacheClientService();
+ MapCacheClientService client2 = new MapCacheClientService();
MockControllerServiceInitializationContext clientInitContext2 = new
MockControllerServiceInitializationContext(client2, "client2");
client2.initialize(clientInitContext2);
final Map<PropertyDescriptor, String> clientProperties = new
HashMap<>();
- clientProperties.put(DistributedMapCacheClientService.HOSTNAME,
"localhost");
- clientProperties.put(DistributedMapCacheClientService.PORT,
String.valueOf(server.getPort()));
-
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
"360 secs");
+ clientProperties.put(MapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(MapCacheClientService.PORT,
String.valueOf(server.getPort()));
+ clientProperties.put(MapCacheClientService.COMMUNICATIONS_TIMEOUT,
"360 secs");
MockConfigurationContext clientContext1 = new
MockConfigurationContext(clientProperties,
clientInitContext1.getControllerServiceLookup(), null);
client1.onEnabled(clientContext1);
@@ -221,7 +221,7 @@ public class TestDistributedMapServerAndClient {
@Test
public void testBackwardCompatibility() throws Exception {
// Create a server that only supports protocol version 1.
- server = new DistributedMapCacheServer() {
+ server = new MapCacheServer() {
@Override
protected CacheServer createMapCacheServer(int port, int maxSize,
SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int
maxReadSize) throws IOException {
return new StandardMapCacheServer(getLogger(),
getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir,
maxReadSize) {
@@ -233,17 +233,17 @@ public class TestDistributedMapServerAndClient {
}
};
runner.addControllerService("server", server);
- runner.setProperty(server, DistributedMapCacheServer.PORT, "0");
+ runner.setProperty(server, MapCacheServer.PORT, "0");
runner.enableControllerService(server);
- DistributedMapCacheClientService client = new
DistributedMapCacheClientService();
+ MapCacheClientService client = new MapCacheClientService();
MockControllerServiceInitializationContext clientInitContext1 = new
MockControllerServiceInitializationContext(client, "client");
client.initialize(clientInitContext1);
final Map<PropertyDescriptor, String> clientProperties = new
HashMap<>();
- clientProperties.put(DistributedMapCacheClientService.HOSTNAME,
"localhost");
- clientProperties.put(DistributedMapCacheClientService.PORT,
String.valueOf(server.getPort()));
-
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
"360 secs");
+ clientProperties.put(MapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(MapCacheClientService.PORT,
String.valueOf(server.getPort()));
+ clientProperties.put(MapCacheClientService.COMMUNICATIONS_TIMEOUT,
"360 secs");
MockConfigurationContext clientContext = new
MockConfigurationContext(clientProperties,
clientInitContext1.getControllerServiceLookup(), null);
client.onEnabled(clientContext);
@@ -276,12 +276,12 @@ public class TestDistributedMapServerAndClient {
public void testLimitServiceReadSize() throws InitializationException,
IOException {
runner.enableControllerService(server);
- final DistributedMapCacheClientService client =
createClient(server.getPort());
+ final MapCacheClientService client = createClient(server.getPort());
try {
final Serializer<String> serializer = new StringSerializer();
final String value = "value";
- final int maxReadSize = new
MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
+ final int maxReadSize = new
MockPropertyValue(AbstractCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
final int belowThreshold = maxReadSize / value.length();
final int aboveThreshold = belowThreshold + 1;
final String valueBelowThreshold = StringUtils.repeat(value,
belowThreshold);
@@ -300,12 +300,12 @@ public class TestDistributedMapServerAndClient {
final NettyEventServerFactory serverFactory = getEventServerFactory(0,
messages);
final EventServer eventServer = serverFactory.getEventServer();
- DistributedMapCacheClientService client = new
DistributedMapCacheClientService();
+ MapCacheClientService client = new MapCacheClientService();
runner.addControllerService("client", client);
- runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME,
"localhost");
- runner.setProperty(client, DistributedMapCacheClientService.PORT,
String.valueOf(eventServer.getListeningPort()));
- runner.setProperty(client,
DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms");
+ runner.setProperty(client, MapCacheClientService.HOSTNAME,
"localhost");
+ runner.setProperty(client, MapCacheClientService.PORT,
String.valueOf(eventServer.getListeningPort()));
+ runner.setProperty(client,
MapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms");
runner.enableControllerService(client);
final Serializer<String> valueSerializer = new StringSerializer();
@@ -328,14 +328,14 @@ public class TestDistributedMapServerAndClient {
return factory;
}
- private DistributedMapCacheClientService createClient(final int port)
throws InitializationException {
- final DistributedMapCacheClientService client = new
DistributedMapCacheClientService();
+ private MapCacheClientService createClient(final int port) throws
InitializationException {
+ final MapCacheClientService client = new MapCacheClientService();
final MockControllerServiceInitializationContext clientInitContext =
new MockControllerServiceInitializationContext(client, "client");
client.initialize(clientInitContext);
final Map<PropertyDescriptor, String> clientProperties = new
HashMap<>();
- clientProperties.put(DistributedMapCacheClientService.HOSTNAME,
"localhost");
- clientProperties.put(DistributedMapCacheClientService.PORT,
String.valueOf(port));
+ clientProperties.put(MapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(MapCacheClientService.PORT, String.valueOf(port));
final MockConfigurationContext clientContext = new
MockConfigurationContext(clientProperties,
clientInitContext.getControllerServiceLookup(), null);
client.onEnabled(clientContext);
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestMapCacheClientService.java
similarity index 87%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestMapCacheClientService.java
index 243cd8fc6b..06f9e788cd 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestMapCacheClientService.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.distributed.cache.server.map;
-import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.MapCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.operations.MapOperation;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
@@ -48,7 +48,7 @@ import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertThrows;
-public class TestDistributedMapCacheClientService {
+public class TestMapCacheClientService {
private static final String LOCALHOST = "127.0.0.1";
private static final int MAX_REQUEST_LENGTH = 64;
@@ -91,13 +91,13 @@ public class TestDistributedMapCacheClientService {
*/
@Test
public void testClientTimeoutOnServerNetworkFailure() throws
InitializationException {
- final String clientId =
DistributedMapCacheClientService.class.getSimpleName();
- final DistributedMapCacheClientService clientService = new
DistributedMapCacheClientService();
+ final String clientId = MapCacheClientService.class.getSimpleName();
+ final MapCacheClientService clientService = new
MapCacheClientService();
runner.addControllerService(clientId, clientService);
- runner.setProperty(clientService,
DistributedMapCacheClientService.HOSTNAME, LOCALHOST);
- runner.setProperty(clientService,
DistributedMapCacheClientService.PORT, String.valueOf(port));
- runner.setProperty(clientService,
DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms");
+ runner.setProperty(clientService, MapCacheClientService.HOSTNAME,
LOCALHOST);
+ runner.setProperty(clientService, MapCacheClientService.PORT,
String.valueOf(port));
+ runner.setProperty(clientService,
MapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms");
runner.enableControllerService(clientService);
runner.assertValid();
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/SetCacheServiceTest.java
similarity index 81%
rename from
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
rename to
nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/SetCacheServiceTest.java
index 401b298804..4634187a61 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/SetCacheServiceTest.java
@@ -17,9 +17,9 @@
package org.apache.nifi.distributed.cache.server.set;
import org.apache.commons.lang3.SerializationException;
-import
org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
+import org.apache.nifi.distributed.cache.client.SetCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
+import org.apache.nifi.distributed.cache.server.SetCacheServer;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -35,27 +35,27 @@ import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class DistributedSetCacheTest {
+public class SetCacheServiceTest {
private static TestRunner runner = null;
- private static DistributedSetCacheServer server = null;
- private static DistributedSetCacheClientService client = null;
+ private static SetCacheServer server = null;
+ private static SetCacheClientService client = null;
private static final Serializer<String> serializer = new
StringSerializer();
@BeforeAll
public static void setRunner() throws Exception {
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- server = new DistributedSetCacheServer();
+ server = new SetCacheServer();
runner.addControllerService(server.getClass().getName(), server);
- runner.setProperty(server, DistributedSetCacheServer.PORT, "0");
+ runner.setProperty(server, SetCacheServer.PORT, "0");
runner.enableControllerService(server);
final int port = server.getPort();
- client = new DistributedSetCacheClientService();
+ client = new SetCacheClientService();
runner.addControllerService(client.getClass().getName(), client);
- runner.setProperty(client, DistributedSetCacheClientService.HOSTNAME,
"localhost");
- runner.setProperty(client, DistributedSetCacheClientService.PORT,
String.valueOf(port));
+ runner.setProperty(client, SetCacheClientService.HOSTNAME,
"localhost");
+ runner.setProperty(client, SetCacheClientService.PORT,
String.valueOf(port));
runner.enableControllerService(client);
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java
index d45abbb51e..9627890dd5 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java
@@ -20,10 +20,10 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
-import
org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
+import org.apache.nifi.distributed.cache.client.SetCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
-import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
+import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
+import org.apache.nifi.distributed.cache.server.SetCacheServer;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
@@ -55,7 +55,7 @@ public class TestDistributedSetServerAndClient {
private TestRunner runner;
- private DistributedSetCacheServer server;
+ private SetCacheServer server;
@BeforeEach
public void setRunner() throws InitializationException, IOException {
@@ -65,10 +65,10 @@ public class TestDistributedSetServerAndClient {
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- server = new DistributedSetCacheServer();
+ server = new SetCacheServer();
runner.addControllerService("server", server);
- runner.setProperty(server, DistributedSetCacheServer.PORT, "0");
+ runner.setProperty(server, SetCacheServer.PORT, "0");
}
@AfterEach
@@ -80,7 +80,7 @@ public class TestDistributedSetServerAndClient {
public void testNonPersistentSetServerAndClient() throws
InitializationException, IOException {
runner.enableControllerService(server);
- final DistributedSetCacheClientService client =
createClient(server.getPort());
+ final SetCacheClientService client = createClient(server.getPort());
try {
final Serializer<String> serializer = new StringSerializer();
final boolean added = client.addIfAbsent("test", serializer);
@@ -104,10 +104,10 @@ public class TestDistributedSetServerAndClient {
@Test
public void testPersistentSetServerAndClient() throws
InitializationException, IOException {
- runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH,
dataFile.getAbsolutePath());
+ runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH,
dataFile.getAbsolutePath());
runner.enableControllerService(server);
- final DistributedSetCacheClientService client =
createClient(server.getPort());
+ final SetCacheClientService client = createClient(server.getPort());
try {
final Serializer<String> serializer = new StringSerializer();
final boolean added = client.addIfAbsent("test", serializer);
@@ -135,12 +135,12 @@ public class TestDistributedSetServerAndClient {
@Test
public void testPersistentSetServerAndClientWithLFUEvictions() throws
InitializationException, IOException {
- runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH,
dataFile.getAbsolutePath());
- runner.setProperty(server,
DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
- runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY,
DistributedSetCacheServer.EVICTION_STRATEGY_LFU);
+ runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH,
dataFile.getAbsolutePath());
+ runner.setProperty(server, SetCacheServer.MAX_CACHE_ENTRIES, "3");
+ runner.setProperty(server, SetCacheServer.EVICTION_POLICY,
SetCacheServer.EVICTION_STRATEGY_LFU);
runner.enableControllerService(server);
- final DistributedSetCacheClientService client =
createClient(server.getPort());
+ final SetCacheClientService client = createClient(server.getPort());
try {
final Serializer<String> serializer = new StringSerializer();
final boolean added = client.addIfAbsent("test", serializer);
@@ -173,12 +173,12 @@ public class TestDistributedSetServerAndClient {
@Test
public void testPersistentSetServerAndClientWithFIFOEvictions() throws
InitializationException, IOException {
- runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH,
dataFile.getAbsolutePath());
- runner.setProperty(server,
DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
- runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY,
DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
+ runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH,
dataFile.getAbsolutePath());
+ runner.setProperty(server, SetCacheServer.MAX_CACHE_ENTRIES, "3");
+ runner.setProperty(server, SetCacheServer.EVICTION_POLICY,
SetCacheServer.EVICTION_STRATEGY_FIFO);
runner.enableControllerService(server);
- final DistributedSetCacheClientService client =
createClient(server.getPort());
+ final SetCacheClientService client = createClient(server.getPort());
try {
final Serializer<String> serializer = new StringSerializer();
@@ -218,12 +218,12 @@ public class TestDistributedSetServerAndClient {
public void testLimitServiceReadSize() throws InitializationException,
IOException {
runner.enableControllerService(server);
- final DistributedSetCacheClientService client =
createClient(server.getPort());
+ final SetCacheClientService client = createClient(server.getPort());
try {
final Serializer<String> serializer = new StringSerializer();
final String value = "value";
- final int maxReadSize = new
MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
+ final int maxReadSize = new
MockPropertyValue(AbstractCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
final int belowThreshold = maxReadSize / value.length();
final int aboveThreshold = belowThreshold + 1;
final String valueBelowThreshold = StringUtils.repeat(value,
belowThreshold);
@@ -243,14 +243,14 @@ public class TestDistributedSetServerAndClient {
}
}
- private DistributedSetCacheClientService createClient(final int port)
throws InitializationException {
- final DistributedSetCacheClientService client = new
DistributedSetCacheClientService();
+ private SetCacheClientService createClient(final int port) throws
InitializationException {
+ final SetCacheClientService client = new SetCacheClientService();
final MockControllerServiceInitializationContext clientInitContext =
new MockControllerServiceInitializationContext(client, "client");
client.initialize(clientInitContext);
final Map<PropertyDescriptor, String> clientProperties = new
HashMap<>();
- clientProperties.put(DistributedSetCacheClientService.HOSTNAME,
"localhost");
- clientProperties.put(DistributedSetCacheClientService.PORT,
String.valueOf(port));
+ clientProperties.put(SetCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(SetCacheClientService.PORT, String.valueOf(port));
final MockConfigurationContext clientContext = new
MockConfigurationContext(clientProperties,
clientInitContext.getControllerServiceLookup(), null);
client.onEnabled(clientContext);
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java
b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java
index 7f878ce2bd..0c49296cfe 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java
@@ -44,7 +44,7 @@ public class TestDistributedMapCacheLookupService {
public void testDistributedMapCacheLookupService() throws
InitializationException {
final TestRunner runner =
TestRunners.newTestRunner(TestProcessor.class);
final DistributedMapCacheLookupService service = new
DistributedMapCacheLookupService();
- final DistributedMapCacheClient client = new
DistributedMapCacheClientImpl();
+ final DistributedMapCacheClient client = new
EphemeralMapCacheClientService();
runner.addControllerService("client", client);
runner.addControllerService("lookup-service", service);
@@ -62,7 +62,7 @@ public class TestDistributedMapCacheLookupService {
assertEquals(EMPTY_STRING, absent);
}
- static final class DistributedMapCacheClientImpl extends
AbstractControllerService implements DistributedMapCacheClient {
+ static final class EphemeralMapCacheClientService extends
AbstractControllerService implements DistributedMapCacheClient {
private Map<String, String> map = new HashMap<String, String>();