tpalfy commented on a change in pull request #4510:
URL: https://github.com/apache/nifi/pull/4510#discussion_r488761544



##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")

Review comment:
       ```suggestion
           " The server does not ask for authentication, it is recommended to 
run it within a secured network.")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cache;
+
+import com.hazelcast.map.IMap;
+import com.hazelcast.map.ReachedMaxSizeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/**
+ * Implementation of {@link HazelcastCache} backed by Hazelcast's IMap data 
structure. It's purpose is to wrap Hazelcast implementation specific details in 
order to
+ * make it possible to easily change version or data structure.
+ */
+public class IMapBasedHazelcastCache implements HazelcastCache {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IMapBasedHazelcastCache.class);
+
+    private final String name;
+    private final long ttlInMillis;
+    private final IMap<String, byte[]> repository;
+
+    /**
+     * @param name Name of the cache stored for identification.
+     * @param ttlInMillis The guaranteed lifetime of a cache entry in 
milliseconds.
+     * @param repository Reference to the actual storage. It should be the 
IMap with the same identifier as cache name.
+     */
+    public IMapBasedHazelcastCache(
+            final String name,
+            final long ttlInMillis,
+            final IMap<String, byte[]> repository) {
+        this.name = name;
+        this.ttlInMillis = ttlInMillis;
+        this.repository = repository;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public byte[] get(final String key) {
+        return repository.get(key);
+    }
+
+    @Override
+    public byte[] putIfAbsent(final String key, final byte[] value) {
+        return repository.putIfAbsent(key, value, ttlInMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean put(final String key, final byte[] value) {
+        try {
+            repository.put(key, value, ttlInMillis, TimeUnit.MILLISECONDS);
+            return true;
+        } catch (final ReachedMaxSizeException e) {
+            LOGGER.error("Cache {} reached the maximum allowed size!", name);
+            return false;
+        }
+    }
+
+    @Override
+    public boolean contains(final String key) {
+        return repository.containsKey(key);
+    }
+
+    @Override
+    public boolean remove(final String key) {
+        return repository.remove(key) != null;
+    }
+
+    @Override
+    public int removeAll(final Predicate<String> keyMatcher) {
+        // Note: the Hazelcast IMap provides support for predicate based 
<code>removeAll</code> method, but it neither atomic or provides information 
about the number of deleted items.

Review comment:
       ```suggestion
           // Note: the Hazelcast IMap provides support for predicate based 
<code>removeAll</code> method, but it's neither atomic nor provides information 
about the number of deleted items.
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")

Review comment:
       ```suggestion
               .description("The name of a given repository. A Hazelcast 
cluster may handle multiple independent caches, each identified by a name." +
                       "Clients using caches with the same name are working on 
the same repository.")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")
+            .description("Indicates how long the written entries should exist 
in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until it's deletion or until the Hazelcast 
server is shut down.")

Review comment:
       ```suggestion
                       "will exists until its deletion or until the Hazelcast 
server is shut down.")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")
+            .description("Indicates how long the written entries should exist 
in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until it's deletion or until the Hazelcast 
server is shut down.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs") // Note: in case of Hazelcast IMap, 
negative value would mean "map default" which might be overridden from a 
different client.
+            .build();
+
+    private static final long STARTING_VERSION = 1;
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HAZELCAST_CACHE_MANAGER);
+        properties.add(HAZELCAST_CACHE_NAME);
+        properties.add(HAZELCAST_ENTRY_TTL);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
+    }
+
+    private volatile HazelcastCache cache = null;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final HazelcastCacheManager hazelcastCacheManager = 
context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class);
+        cache = hazelcastCacheManager.getCache(
+                context.getProperty(HAZELCAST_CACHE_NAME).getValue(),
+                
context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS));
+        getLogger().debug("Enable Hazelcast cache client for cache " + 
cache.name());
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        // The cache state will be preserved until the Service is not stopped!
+        getLogger().debug("Disable Hazelcast cache client for cache " + 
cache.name());
+        cache = null;
+    }
+
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final 
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws 
IOException {
+        final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
+        return (result == null) ? null : new AtomicCacheEntry<>(key, 
parsePayload(valueDeserializer, result), parseVersion(result));
+    }
+
+    @Override
+    public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, 
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+        if (entry.getKey() == null) {
+            return false;
+        }
+
+        final String key = getCacheEntryKey(entry.getKey(), keySerializer);
+
+        try(final HazelcastCache.HazelcastCacheEntryLock lock = 
cache.acquireLock(key)) {
+            final byte[] oldValue = cache.get(key);
+
+            if (oldValue == null && (!entry.getRevision().isPresent() || 
entry.getRevision().get() < STARTING_VERSION)) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, 
STARTING_VERSION));
+                getLogger().debug("Entry with key " + key + " was added during 
replace");
+                return true;
+            } else if (oldValue != null && entry.getRevision().get() == 
parseVersion(oldValue)) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, 
entry.getRevision().get() + 1));
+                getLogger().debug("Entry with key " + key + " was updated 
during replace, with revision " + entry.getRevision().get() + 1);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+        return cache.putIfAbsent(getCacheEntryKey(key, keySerializer), 
serialize(value, valueSerializer, STARTING_VERSION)) == null;
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(
+            final K key, final V value, final Serializer<K> keySerializer, 
final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer
+    ) throws IOException {
+        final byte[] result = cache.putIfAbsent(getCacheEntryKey(key, 
keySerializer), serialize(value, valueSerializer, STARTING_VERSION));
+        return (result == null) ? null : parsePayload(valueDeserializer, 
result);
+    }
+
+    @Override
+    public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
+        return cache.contains(getCacheEntryKey(key, keySerializer));
+    }
+
+    @Override
+    public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        cache.put(getCacheEntryKey(key, keySerializer), serialize(value, 
valueSerializer, STARTING_VERSION));
+    }
+
+    @Override
+    public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
+        final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
+        return result == null ? null : parsePayload(valueDeserializer, result);
+    }
+
+    @Override
+    public <K> boolean remove(final K key, final Serializer<K> keySerializer) 
throws IOException {
+        return cache.remove(getCacheEntryKey(key, keySerializer));
+    }
+
+    @Override
+    public long removeByPattern(final String regex) throws IOException {
+        return cache.removeAll(new RegexPredicate(regex));
+    }
+
+    private static class RegexPredicate implements Predicate<String>, 
Serializable {
+        private final Pattern pattern;
+
+        private RegexPredicate(final String regex) {
+            this.pattern = Pattern.compile(regex);
+        }
+
+        @Override
+        public boolean test(final String string) {
+            return pattern.matcher(string).matches();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        getLogger().debug("Closing " + this.getClass().getSimpleName());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    private static long parseVersion(final byte[] value) {
+        return LongUtil.fromPaddedBytes(Arrays.copyOfRange(value, 0, 
Long.BYTES));
+    }
+
+    private static <V> V parsePayload(final Deserializer<V> deserializer, 
final byte[] value) throws IOException {
+        return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, 
value.length));
+    }
+
+    private <S> String getCacheEntryKey(final S value, final Serializer<S> 
serializer) throws IOException {
+        final String result;
+
+        if (value instanceof String) {
+            result = (String) value;
+        } else {
+            final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            serializer.serialize(value, stream);
+            result = stream.toString("UTF-8");
+        }
+
+        if (result.isEmpty()) {
+            throw new IOException("Cache record key cannot be empty!");
+        }
+
+        return result;
+    }
+
+    /**
+     * Serializes incoming value using the given serializer. The end result is 
a byte array might be parsed by this
+     * implementation. As a convention, the first eight bytes of the array 
contains a long value, serves as version
+     * identifier. From the ninth byte, the actual value starts. During 
parsing this, the version must be read every
+     * time as convention.

Review comment:
       ```suggestion
        * Serializes a value using the given serializer. The first eight bytes 
of the array contains the revision.
        * The rest holds the actual serialized value.
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/IMapBasedHazelcastCacheManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cache.IMapBasedHazelcastCache;
+import org.apache.nifi.reporting.InitializationException;
+
+abstract class IMapBasedHazelcastCacheManager extends 
AbstractControllerService implements HazelcastCacheManager {
+    protected static final String ADDRESS_SEPARATOR = ",";
+
+    public static final PropertyDescriptor HAZELCAST_CLUSTER_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cluster-name")
+            .displayName("Hazelcast Cluster Name")
+            .description("Name of the Hazelcast instance's cluster.")
+            .defaultValue("nifi") // Hazelcast's default is "dev", "nifi" 
overwrites this.
+            .required(false)
+            .addValidator(Validator.VALID)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private volatile HazelcastInstance instance;
+
+    @Override
+    public HazelcastCache getCache(final String name, final long ttlInMillis) {
+        return new IMapBasedHazelcastCache(name, ttlInMillis, 
instance.getMap(name));
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
+        try {
+            instance = getInstance(context);
+        } catch (final Exception e) {
+            getLogger().error("Could not initialize Hazelcast connection. 
Reason: " + e.getMessage(), e);

Review comment:
       ```suggestion
               getLogger().error("Could not create Hazelcast instance. Reason: 
" + e.getMessage(), e);
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.

Review comment:
       ```suggestion
    * Note: By design, the client should not directly depend on Hazelcast 
specific classes to allow easy version and implementation changes.
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")

Review comment:
       ```suggestion
               .displayName("Hazelcast Entry Lifetime")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")

Review comment:
       ```suggestion
               .description("A Hazelcast Cache Manager which manages 
connections to Hazelcast and provides cache instances")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cache;
+
+import com.hazelcast.map.IMap;
+import com.hazelcast.map.ReachedMaxSizeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/**
+ * Implementation of {@link HazelcastCache} backed by Hazelcast's IMap data 
structure. It's purpose is to wrap Hazelcast implementation specific details in 
order to
+ * make it possible to easily change version or data structure.
+ */
+public class IMapBasedHazelcastCache implements HazelcastCache {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IMapBasedHazelcastCache.class);
+
+    private final String name;
+    private final long ttlInMillis;
+    private final IMap<String, byte[]> repository;
+
+    /**
+     * @param name Name of the cache stored for identification.
+     * @param ttlInMillis The guaranteed lifetime of a cache entry in 
milliseconds.
+     * @param repository Reference to the actual storage. It should be the 
IMap with the same identifier as cache name.
+     */
+    public IMapBasedHazelcastCache(
+            final String name,
+            final long ttlInMillis,
+            final IMap<String, byte[]> repository) {
+        this.name = name;
+        this.ttlInMillis = ttlInMillis;
+        this.repository = repository;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public byte[] get(final String key) {
+        return repository.get(key);
+    }
+
+    @Override
+    public byte[] putIfAbsent(final String key, final byte[] value) {
+        return repository.putIfAbsent(key, value, ttlInMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean put(final String key, final byte[] value) {
+        try {
+            repository.put(key, value, ttlInMillis, TimeUnit.MILLISECONDS);
+            return true;
+        } catch (final ReachedMaxSizeException e) {
+            LOGGER.error("Cache {} reached the maximum allowed size!", name);
+            return false;
+        }
+    }
+
+    @Override
+    public boolean contains(final String key) {
+        return repository.containsKey(key);
+    }
+
+    @Override
+    public boolean remove(final String key) {
+        return repository.remove(key) != null;
+    }
+
+    @Override
+    public int removeAll(final Predicate<String> keyMatcher) {
+        // Note: the Hazelcast IMap provides support for predicate based 
<code>removeAll</code> method, but it neither atomic or provides information 
about the number of deleted items.
+        final Set<String> keys = repository.keySet();
+        int result = 0;
+
+        for (final String key : keys) {
+            if (keyMatcher.test(key)) {
+                repository.remove(key);

Review comment:
       ```suggestion
                   repository.delete(key);
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")
+            .description("Indicates how long the written entries should exist 
in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until it's deletion or until the Hazelcast 
server is shut down.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs") // Note: in case of Hazelcast IMap, 
negative value would mean "map default" which might be overridden from a 
different client.

Review comment:
       ```suggestion
               .defaultValue("0 secs") // Note: in case of Hazelcast IMap, 
negative value would mean "map default" which might be overridden by a 
different client.
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")
+public class EmbeddedHazelcastCacheManager extends 
IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+
+    private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
+            " every node has access only the data stored within that node.");
+    private static final AllowableValue HA_CLUSTER = new 
AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the 
NiFi cluster:" +
+            " It expects every NiFi nodes to have a running Hazelcast instance 
on the same port as specified in the Hazelcast Port property. No explicit 
listing of the" +
+            " instances is needed.");
+    private static final AllowableValue HA_EXPLICIT = new 
AllowableValue("explicit", "Explicit", "Works with an explicit list of 
Hazelcast instances," +
+            " creating cluster using those. This provides greater control over 
the used servers, making it possible to utilize only a number of nodes as 
Hazelcast server." +
+            " The list of Hazelcast instances takes place in property 
\"Hazelcast Instances\".");
+
+    private static final PropertyDescriptor HAZELCAST_PORT = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-port")
+            .displayName("Hazelcast Port")
+            .description("Port the Hazelcast uses as starting port. If not 
specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT 
+ ".")
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_HA_MODE = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-ha-mode")
+            .displayName("Hazelcast High Availability Mode")
+            .description("Specifies in what strategy the Hazelcast cluster 
should be created.")
+            .required(true)
+            .allowableValues(HA_NONE, HA_CLUSTER, HA_EXPLICIT)
+            .defaultValue(HA_NONE.getValue()) // None is used for default in 
order to be valid with standalone NiFi.
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_INSTANCES = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-instances")
+            .displayName("Hazelcast Instances")
+            .description("List of Hazelcast instances should be part of the 
cluster, using {host:port} format separated the instances by comma." +
+                    " Only used when high availability mode is set to 
\"Explicit\". The list must contain every instances will be part of the 
cluster.")
+            .required(false)
+            .addValidator(StandardValidators.URI_LIST_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+                HAZELCAST_CLUSTER_NAME,
+                HAZELCAST_PORT,
+                HAZELCAST_HA_MODE,
+                HAZELCAST_INSTANCES
+        ));
+    }
+
+    @Override
+    protected HazelcastInstance getInstance(final ConfigurationContext 
context) {
+        final String instanceName = UUID.randomUUID().toString();
+        final Config config = new Config(instanceName);
+        final NetworkConfig networkConfig = config.getNetworkConfig();
+        final TcpIpConfig tcpIpConfig = 
networkConfig.getJoin().getTcpIpConfig();
+        final String haMode = 
context.getProperty(HAZELCAST_HA_MODE).getValue();
+
+        if (context.getProperty(HAZELCAST_CLUSTER_NAME).isSet()) {
+            
config.setClusterName(context.getProperty(HAZELCAST_CLUSTER_NAME).evaluateAttributeExpressions().getValue());
+        }
+
+        final int port = context.getProperty(HAZELCAST_PORT).isSet()
+                ? 
context.getProperty(HAZELCAST_PORT).evaluateAttributeExpressions().asInteger()
+                : DEFAULT_HAZELCAST_PORT;

Review comment:
       ```suggestion
           final int port = 
context.getProperty(HAZELCAST_PORT).evaluateAttributeExpressions().asInteger();
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")
+public class EmbeddedHazelcastCacheManager extends 
IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+
+    private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
+            " every node has access only the data stored within that node.");
+    private static final AllowableValue HA_CLUSTER = new 
AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the 
NiFi cluster:" +
+            " It expects every NiFi nodes to have a running Hazelcast instance 
on the same port as specified in the Hazelcast Port property. No explicit 
listing of the" +
+            " instances is needed.");
+    private static final AllowableValue HA_EXPLICIT = new 
AllowableValue("explicit", "Explicit", "Works with an explicit list of 
Hazelcast instances," +
+            " creating cluster using those. This provides greater control over 
the used servers, making it possible to utilize only a number of nodes as 
Hazelcast server." +
+            " The list of Hazelcast instances takes place in property 
\"Hazelcast Instances\".");
+
+    private static final PropertyDescriptor HAZELCAST_PORT = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-port")
+            .displayName("Hazelcast Port")
+            .description("Port the Hazelcast uses as starting port. If not 
specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT 
+ ".")
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_HA_MODE = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-ha-mode")
+            .displayName("Hazelcast High Availability Mode")
+            .description("Specifies in what strategy the Hazelcast cluster 
should be created.")

Review comment:
       ```suggestion
               .description("Specifies with what strategy the Hazelcast cluster 
should be created.")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")
+            .description("Indicates how long the written entries should exist 
in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until it's deletion or until the Hazelcast 
server is shut down.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs") // Note: in case of Hazelcast IMap, 
negative value would mean "map default" which might be overridden from a 
different client.
+            .build();
+
+    private static final long STARTING_VERSION = 1;

Review comment:
       ```suggestion
       private static final long STARTING_REVISION = 1;
   ```
   In general for consistency's sake `revision` could be better than `version` 
in other places as well.

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")
+public class EmbeddedHazelcastCacheManager extends 
IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+
+    private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
+            " every node has access only the data stored within that node.");
+    private static final AllowableValue HA_CLUSTER = new 
AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the 
NiFi cluster:" +
+            " It expects every NiFi nodes to have a running Hazelcast instance 
on the same port as specified in the Hazelcast Port property. No explicit 
listing of the" +
+            " instances is needed.");
+    private static final AllowableValue HA_EXPLICIT = new 
AllowableValue("explicit", "Explicit", "Works with an explicit list of 
Hazelcast instances," +
+            " creating cluster using those. This provides greater control over 
the used servers, making it possible to utilize only a number of nodes as 
Hazelcast server." +
+            " The list of Hazelcast instances takes place in property 
\"Hazelcast Instances\".");
+
+    private static final PropertyDescriptor HAZELCAST_PORT = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-port")
+            .displayName("Hazelcast Port")
+            .description("Port the Hazelcast uses as starting port. If not 
specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT 
+ ".")
+            .required(false)

Review comment:
       ```suggestion
               .required(true)
               .defaultValue(DEFAULT_HAZELCAST_PORT)
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")
+            .description("Indicates how long the written entries should exist 
in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until it's deletion or until the Hazelcast 
server is shut down.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs") // Note: in case of Hazelcast IMap, 
negative value would mean "map default" which might be overridden from a 
different client.
+            .build();
+
+    private static final long STARTING_VERSION = 1;
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HAZELCAST_CACHE_MANAGER);
+        properties.add(HAZELCAST_CACHE_NAME);
+        properties.add(HAZELCAST_ENTRY_TTL);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
+    }
+
+    private volatile HazelcastCache cache = null;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final HazelcastCacheManager hazelcastCacheManager = 
context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class);
+        cache = hazelcastCacheManager.getCache(
+                context.getProperty(HAZELCAST_CACHE_NAME).getValue(),
+                
context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS));
+        getLogger().debug("Enable Hazelcast cache client for cache " + 
cache.name());
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        // The cache state will be preserved until the Service is not stopped!
+        getLogger().debug("Disable Hazelcast cache client for cache " + 
cache.name());
+        cache = null;
+    }
+
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final 
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws 
IOException {
+        final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
+        return (result == null) ? null : new AtomicCacheEntry<>(key, 
parsePayload(valueDeserializer, result), parseVersion(result));
+    }
+
+    @Override
+    public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, 
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+        if (entry.getKey() == null) {
+            return false;
+        }
+
+        final String key = getCacheEntryKey(entry.getKey(), keySerializer);
+
+        try(final HazelcastCache.HazelcastCacheEntryLock lock = 
cache.acquireLock(key)) {
+            final byte[] oldValue = cache.get(key);
+
+            if (oldValue == null && (!entry.getRevision().isPresent() || 
entry.getRevision().get() < STARTING_VERSION)) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, 
STARTING_VERSION));
+                getLogger().debug("Entry with key " + key + " was added during 
replace");
+                return true;
+            } else if (oldValue != null && entry.getRevision().get() == 
parseVersion(oldValue)) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, 
entry.getRevision().get() + 1));
+                getLogger().debug("Entry with key " + key + " was updated 
during replace, with revision " + entry.getRevision().get() + 1);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+        return cache.putIfAbsent(getCacheEntryKey(key, keySerializer), 
serialize(value, valueSerializer, STARTING_VERSION)) == null;
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(
+            final K key, final V value, final Serializer<K> keySerializer, 
final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer
+    ) throws IOException {
+        final byte[] result = cache.putIfAbsent(getCacheEntryKey(key, 
keySerializer), serialize(value, valueSerializer, STARTING_VERSION));
+        return (result == null) ? null : parsePayload(valueDeserializer, 
result);
+    }
+
+    @Override
+    public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
+        return cache.contains(getCacheEntryKey(key, keySerializer));
+    }
+
+    @Override
+    public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        cache.put(getCacheEntryKey(key, keySerializer), serialize(value, 
valueSerializer, STARTING_VERSION));
+    }
+
+    @Override
+    public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
+        final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
+        return result == null ? null : parsePayload(valueDeserializer, result);
+    }
+
+    @Override
+    public <K> boolean remove(final K key, final Serializer<K> keySerializer) 
throws IOException {
+        return cache.remove(getCacheEntryKey(key, keySerializer));
+    }
+
+    @Override
+    public long removeByPattern(final String regex) throws IOException {
+        return cache.removeAll(new RegexPredicate(regex));
+    }
+
+    private static class RegexPredicate implements Predicate<String>, 
Serializable {
+        private final Pattern pattern;
+
+        private RegexPredicate(final String regex) {
+            this.pattern = Pattern.compile(regex);
+        }
+
+        @Override
+        public boolean test(final String string) {
+            return pattern.matcher(string).matches();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        getLogger().debug("Closing " + this.getClass().getSimpleName());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    private static long parseVersion(final byte[] value) {
+        return LongUtil.fromPaddedBytes(Arrays.copyOfRange(value, 0, 
Long.BYTES));
+    }
+
+    private static <V> V parsePayload(final Deserializer<V> deserializer, 
final byte[] value) throws IOException {
+        return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, 
value.length));
+    }
+
+    private <S> String getCacheEntryKey(final S value, final Serializer<S> 
serializer) throws IOException {

Review comment:
       ```suggestion
       private <S> String getCacheEntryKey(final S key, final Serializer<S> 
serializer) throws IOException {
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")
+            .description("Indicates how long the written entries should exist 
in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until it's deletion or until the Hazelcast 
server is shut down.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs") // Note: in case of Hazelcast IMap, 
negative value would mean "map default" which might be overridden from a 
different client.
+            .build();
+
+    private static final long STARTING_VERSION = 1;
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HAZELCAST_CACHE_MANAGER);
+        properties.add(HAZELCAST_CACHE_NAME);
+        properties.add(HAZELCAST_ENTRY_TTL);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
+    }
+
+    private volatile HazelcastCache cache = null;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final HazelcastCacheManager hazelcastCacheManager = 
context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class);
+        cache = hazelcastCacheManager.getCache(
+                context.getProperty(HAZELCAST_CACHE_NAME).getValue(),
+                
context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS));
+        getLogger().debug("Enable Hazelcast cache client for cache " + 
cache.name());
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        // The cache state will be preserved until the Service is not stopped!
+        getLogger().debug("Disable Hazelcast cache client for cache " + 
cache.name());
+        cache = null;
+    }
+
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final 
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws 
IOException {
+        final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
+        return (result == null) ? null : new AtomicCacheEntry<>(key, 
parsePayload(valueDeserializer, result), parseVersion(result));
+    }
+
+    @Override
+    public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, 
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+        if (entry.getKey() == null) {
+            return false;
+        }
+
+        final String key = getCacheEntryKey(entry.getKey(), keySerializer);
+
+        try(final HazelcastCache.HazelcastCacheEntryLock lock = 
cache.acquireLock(key)) {
+            final byte[] oldValue = cache.get(key);
+
+            if (oldValue == null && (!entry.getRevision().isPresent() || 
entry.getRevision().get() < STARTING_VERSION)) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, 
STARTING_VERSION));
+                getLogger().debug("Entry with key " + key + " was added during 
replace");
+                return true;
+            } else if (oldValue != null && entry.getRevision().get() == 
parseVersion(oldValue)) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, 
entry.getRevision().get() + 1));
+                getLogger().debug("Entry with key " + key + " was updated 
during replace, with revision " + entry.getRevision().get() + 1);

Review comment:
       Not entirely sure but to my understanding, according to the interface, 
`replace` should _not_ change the revision.

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")
+            .description("Indicates how long the written entries should exist 
in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until it's deletion or until the Hazelcast 
server is shut down.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs") // Note: in case of Hazelcast IMap, 
negative value would mean "map default" which might be overridden from a 
different client.
+            .build();
+
+    private static final long STARTING_VERSION = 1;
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HAZELCAST_CACHE_MANAGER);
+        properties.add(HAZELCAST_CACHE_NAME);
+        properties.add(HAZELCAST_ENTRY_TTL);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
+    }
+
+    private volatile HazelcastCache cache = null;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final HazelcastCacheManager hazelcastCacheManager = 
context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class);
+        cache = hazelcastCacheManager.getCache(
+                context.getProperty(HAZELCAST_CACHE_NAME).getValue(),
+                
context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS));
+        getLogger().debug("Enable Hazelcast cache client for cache " + 
cache.name());
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        // The cache state will be preserved until the Service is not stopped!
+        getLogger().debug("Disable Hazelcast cache client for cache " + 
cache.name());
+        cache = null;
+    }
+
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final 
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws 
IOException {
+        final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
+        return (result == null) ? null : new AtomicCacheEntry<>(key, 
parsePayload(valueDeserializer, result), parseVersion(result));
+    }
+
+    @Override
+    public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, 
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+        if (entry.getKey() == null) {
+            return false;
+        }
+
+        final String key = getCacheEntryKey(entry.getKey(), keySerializer);
+
+        try(final HazelcastCache.HazelcastCacheEntryLock lock = 
cache.acquireLock(key)) {
+            final byte[] oldValue = cache.get(key);
+
+            if (oldValue == null && (!entry.getRevision().isPresent() || 
entry.getRevision().get() < STARTING_VERSION)) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, 
STARTING_VERSION));
+                getLogger().debug("Entry with key " + key + " was added during 
replace");
+                return true;
+            } else if (oldValue != null && entry.getRevision().get() == 
parseVersion(oldValue)) {

Review comment:
       ```suggestion
               } else if (oldValue != null && 
Objects.equals(entry.getRevision().get(), parseVersion(oldValue))) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/IMapBasedHazelcastCacheManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cache.IMapBasedHazelcastCache;
+import org.apache.nifi.reporting.InitializationException;
+
+abstract class IMapBasedHazelcastCacheManager extends 
AbstractControllerService implements HazelcastCacheManager {
+    protected static final String ADDRESS_SEPARATOR = ",";
+
+    public static final PropertyDescriptor HAZELCAST_CLUSTER_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cluster-name")
+            .displayName("Hazelcast Cluster Name")
+            .description("Name of the Hazelcast instance's cluster.")

Review comment:
       ```suggestion
               .description("Name of the Hazelcast cluster.")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast entry TTL")
+            .description("Indicates how long the written entries should exist 
in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until it's deletion or until the Hazelcast 
server is shut down.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs") // Note: in case of Hazelcast IMap, 
negative value would mean "map default" which might be overridden from a 
different client.
+            .build();
+
+    private static final long STARTING_VERSION = 1;
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HAZELCAST_CACHE_MANAGER);
+        properties.add(HAZELCAST_CACHE_NAME);
+        properties.add(HAZELCAST_ENTRY_TTL);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
+    }
+
+    private volatile HazelcastCache cache = null;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final HazelcastCacheManager hazelcastCacheManager = 
context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class);
+        cache = hazelcastCacheManager.getCache(
+                context.getProperty(HAZELCAST_CACHE_NAME).getValue(),

Review comment:
       ```suggestion
                   
context.getProperty(HAZELCAST_CACHE_NAME).evaluateAttributeExpressions().getValue(),
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")
+public class EmbeddedHazelcastCacheManager extends 
IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+
+    private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
+            " every node has access only the data stored within that node.");
+    private static final AllowableValue HA_CLUSTER = new 
AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the 
NiFi cluster:" +
+            " It expects every NiFi nodes to have a running Hazelcast instance 
on the same port as specified in the Hazelcast Port property. No explicit 
listing of the" +
+            " instances is needed.");
+    private static final AllowableValue HA_EXPLICIT = new 
AllowableValue("explicit", "Explicit", "Works with an explicit list of 
Hazelcast instances," +
+            " creating cluster using those. This provides greater control over 
the used servers, making it possible to utilize only a number of nodes as 
Hazelcast server." +
+            " The list of Hazelcast instances takes place in property 
\"Hazelcast Instances\".");

Review comment:
       ```suggestion
       private static final AllowableValue HA_EXPLICIT = new 
AllowableValue("explicit", "Explicit", "Works with an explicit list of 
Hazelcast instances," +
               " creating a cluster using those. This provides greater control, 
making it possible to utilize only certain nodes as Hazelcast servers." +
               " The list of Hazelcast instances can be set in the property 
\"Hazelcast Instances\".");
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.hazelcast.services.util.LongUtil;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the 
backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast 
specific classes due to ease version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Hazelcast as the backing cache. This service relies on " +
+        "an abstracted repository manages the actual Hazelcast calls, provided 
by HazelcastConnectionService.")
+public class HazelcastMapCacheClient extends AbstractControllerService 
implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections 
to Hazelcast and providing cache instances")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given repository. Within a Hazelcast 
cluster, multiple unrelated caches might be used." +
+                    "Clients using the same cache name will depend on the same 
data structure.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review comment:
       ```suggestion
               .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")
+public class EmbeddedHazelcastCacheManager extends 
IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+
+    private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
+            " every node has access only the data stored within that node.");
+    private static final AllowableValue HA_CLUSTER = new 
AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the 
NiFi cluster:" +
+            " It expects every NiFi nodes to have a running Hazelcast instance 
on the same port as specified in the Hazelcast Port property. No explicit 
listing of the" +
+            " instances is needed.");
+    private static final AllowableValue HA_EXPLICIT = new 
AllowableValue("explicit", "Explicit", "Works with an explicit list of 
Hazelcast instances," +
+            " creating cluster using those. This provides greater control over 
the used servers, making it possible to utilize only a number of nodes as 
Hazelcast server." +
+            " The list of Hazelcast instances takes place in property 
\"Hazelcast Instances\".");
+
+    private static final PropertyDescriptor HAZELCAST_PORT = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-port")
+            .displayName("Hazelcast Port")
+            .description("Port the Hazelcast uses as starting port. If not 
specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT 
+ ".")
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_HA_MODE = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-ha-mode")
+            .displayName("Hazelcast High Availability Mode")
+            .description("Specifies in what strategy the Hazelcast cluster 
should be created.")
+            .required(true)
+            .allowableValues(HA_NONE, HA_CLUSTER, HA_EXPLICIT)
+            .defaultValue(HA_NONE.getValue()) // None is used for default in 
order to be valid with standalone NiFi.
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_INSTANCES = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-instances")
+            .displayName("Hazelcast Instances")
+            .description("List of Hazelcast instances should be part of the 
cluster, using {host:port} format separated the instances by comma." +
+                    " Only used when high availability mode is set to 
\"Explicit\". The list must contain every instances will be part of the 
cluster.")

Review comment:
       ```suggestion
               .description("List of Hazelcast instances that should be part of 
the cluster. Instances are using {host:port} format, separated by comma." +
                       " Only used when high availability mode is set to 
\"Explicit\". The list must contain every instances that will be part of the 
cluster.")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")
+public class EmbeddedHazelcastCacheManager extends 
IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+
+    private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
+            " every node has access only the data stored within that node.");
+    private static final AllowableValue HA_CLUSTER = new 
AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the 
NiFi cluster:" +
+            " It expects every NiFi nodes to have a running Hazelcast instance 
on the same port as specified in the Hazelcast Port property. No explicit 
listing of the" +
+            " instances is needed.");
+    private static final AllowableValue HA_EXPLICIT = new 
AllowableValue("explicit", "Explicit", "Works with an explicit list of 
Hazelcast instances," +
+            " creating cluster using those. This provides greater control over 
the used servers, making it possible to utilize only a number of nodes as 
Hazelcast server." +
+            " The list of Hazelcast instances takes place in property 
\"Hazelcast Instances\".");
+
+    private static final PropertyDescriptor HAZELCAST_PORT = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-port")
+            .displayName("Hazelcast Port")
+            .description("Port the Hazelcast uses as starting port. If not 
specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT 
+ ".")

Review comment:
       ```suggestion
               .description("Port for the Hazelcast instance to use. If not 
specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT 
+ ".")
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")
+public class EmbeddedHazelcastCacheManager extends 
IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+
+    private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
+            " every node has access only the data stored within that node.");
+    private static final AllowableValue HA_CLUSTER = new 
AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the 
NiFi cluster:" +
+            " It expects every NiFi nodes to have a running Hazelcast instance 
on the same port as specified in the Hazelcast Port property. No explicit 
listing of the" +
+            " instances is needed.");
+    private static final AllowableValue HA_EXPLICIT = new 
AllowableValue("explicit", "Explicit", "Works with an explicit list of 
Hazelcast instances," +
+            " creating cluster using those. This provides greater control over 
the used servers, making it possible to utilize only a number of nodes as 
Hazelcast server." +
+            " The list of Hazelcast instances takes place in property 
\"Hazelcast Instances\".");
+
+    private static final PropertyDescriptor HAZELCAST_PORT = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-port")
+            .displayName("Hazelcast Port")
+            .description("Port the Hazelcast uses as starting port. If not 
specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT 
+ ".")
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_HA_MODE = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-ha-mode")
+            .displayName("Hazelcast High Availability Mode")
+            .description("Specifies in what strategy the Hazelcast cluster 
should be created.")
+            .required(true)
+            .allowableValues(HA_NONE, HA_CLUSTER, HA_EXPLICIT)
+            .defaultValue(HA_NONE.getValue()) // None is used for default in 
order to be valid with standalone NiFi.
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_INSTANCES = new 
PropertyDescriptor.Builder()
+            .name("hazelcast-instances")
+            .displayName("Hazelcast Instances")
+            .description("List of Hazelcast instances should be part of the 
cluster, using {host:port} format separated the instances by comma." +
+                    " Only used when high availability mode is set to 
\"Explicit\". The list must contain every instances will be part of the 
cluster.")
+            .required(false)
+            .addValidator(StandardValidators.URI_LIST_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+                HAZELCAST_CLUSTER_NAME,
+                HAZELCAST_PORT,
+                HAZELCAST_HA_MODE,
+                HAZELCAST_INSTANCES
+        ));
+    }
+
+    @Override
+    protected HazelcastInstance getInstance(final ConfigurationContext 
context) {
+        final String instanceName = UUID.randomUUID().toString();
+        final Config config = new Config(instanceName);
+        final NetworkConfig networkConfig = config.getNetworkConfig();
+        final TcpIpConfig tcpIpConfig = 
networkConfig.getJoin().getTcpIpConfig();
+        final String haMode = 
context.getProperty(HAZELCAST_HA_MODE).getValue();
+
+        if (context.getProperty(HAZELCAST_CLUSTER_NAME).isSet()) {
+            
config.setClusterName(context.getProperty(HAZELCAST_CLUSTER_NAME).evaluateAttributeExpressions().getValue());
+        }
+
+        final int port = context.getProperty(HAZELCAST_PORT).isSet()
+                ? 
context.getProperty(HAZELCAST_PORT).evaluateAttributeExpressions().asInteger()
+                : DEFAULT_HAZELCAST_PORT;
+
+        // It high availability is turned off, we turn of the capability for 
the Hazelcast instance to form a cluster.

Review comment:
       ```suggestion
           // It high availability is turned off, we turn off the capability of 
the Hazelcast instance to form a cluster.
   ```

##########
File path: 
nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides connections to an embedded 
Hazelcast instance started by NiFi." +
+        " The server does not asks for authentication, it is suggested to run 
it within secured network.")
+public class EmbeddedHazelcastCacheManager extends 
IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+
+    private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
+            " every node has access only the data stored within that node.");

Review comment:
       ```suggestion
       private static final AllowableValue HA_NONE = new AllowableValue("none", 
"None", "No high availability or data replication is provided," +
               " every node has access only to the data stored locally.");
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to