tpalfy commented on a change in pull request #4510: URL: https://github.com/apache/nifi/pull/4510#discussion_r488761544
########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") Review comment: ```suggestion " The server does not ask for authentication, it is recommended to run it within a secured network.") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cache; + +import com.hazelcast.map.IMap; +import com.hazelcast.map.ReachedMaxSizeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +/** + * Implementation of {@link HazelcastCache} backed by Hazelcast's IMap data structure. It's purpose is to wrap Hazelcast implementation specific details in order to + * make it possible to easily change version or data structure. + */ +public class IMapBasedHazelcastCache implements HazelcastCache { + private static final Logger LOGGER = LoggerFactory.getLogger(IMapBasedHazelcastCache.class); + + private final String name; + private final long ttlInMillis; + private final IMap<String, byte[]> repository; + + /** + * @param name Name of the cache stored for identification. + * @param ttlInMillis The guaranteed lifetime of a cache entry in milliseconds. + * @param repository Reference to the actual storage. It should be the IMap with the same identifier as cache name. + */ + public IMapBasedHazelcastCache( + final String name, + final long ttlInMillis, + final IMap<String, byte[]> repository) { + this.name = name; + this.ttlInMillis = ttlInMillis; + this.repository = repository; + } + + @Override + public String name() { + return name; + } + + @Override + public byte[] get(final String key) { + return repository.get(key); + } + + @Override + public byte[] putIfAbsent(final String key, final byte[] value) { + return repository.putIfAbsent(key, value, ttlInMillis, TimeUnit.MILLISECONDS); + } + + @Override + public boolean put(final String key, final byte[] value) { + try { + repository.put(key, value, ttlInMillis, TimeUnit.MILLISECONDS); + return true; + } catch (final ReachedMaxSizeException e) { + LOGGER.error("Cache {} reached the maximum allowed size!", name); + return false; + } + } + + @Override + public boolean contains(final String key) { + return repository.containsKey(key); + } + + @Override + public boolean remove(final String key) { + return repository.remove(key) != null; + } + + @Override + public int removeAll(final Predicate<String> keyMatcher) { + // Note: the Hazelcast IMap provides support for predicate based <code>removeAll</code> method, but it neither atomic or provides information about the number of deleted items. Review comment: ```suggestion // Note: the Hazelcast IMap provides support for predicate based <code>removeAll</code> method, but it's neither atomic nor provides information about the number of deleted items. ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") Review comment: ```suggestion .description("The name of a given repository. A Hazelcast cluster may handle multiple independent caches, each identified by a name." + "Clients using caches with the same name are working on the same repository.") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") + .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" + + "will exists until it's deletion or until the Hazelcast server is shut down.") Review comment: ```suggestion "will exists until its deletion or until the Hazelcast server is shut down.") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") + .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" + + "will exists until it's deletion or until the Hazelcast server is shut down.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden from a different client. + .build(); + + private static final long STARTING_VERSION = 1; + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HAZELCAST_CACHE_MANAGER); + properties.add(HAZELCAST_CACHE_NAME); + properties.add(HAZELCAST_ENTRY_TTL); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties); + } + + private volatile HazelcastCache cache = null; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final HazelcastCacheManager hazelcastCacheManager = context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class); + cache = hazelcastCacheManager.getCache( + context.getProperty(HAZELCAST_CACHE_NAME).getValue(), + context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS)); + getLogger().debug("Enable Hazelcast cache client for cache " + cache.name()); + } + + @OnDisabled + public void onDisabled() { + // The cache state will be preserved until the Service is not stopped! + getLogger().debug("Disable Hazelcast cache client for cache " + cache.name()); + cache = null; + } + + @Override + public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); + return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseVersion(result)); + } + + @Override + public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + if (entry.getKey() == null) { + return false; + } + + final String key = getCacheEntryKey(entry.getKey(), keySerializer); + + try(final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) { + final byte[] oldValue = cache.get(key); + + if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_VERSION)) { + cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_VERSION)); + getLogger().debug("Entry with key " + key + " was added during replace"); + return true; + } else if (oldValue != null && entry.getRevision().get() == parseVersion(oldValue)) { + cache.put(key, serialize(entry.getValue(), valueSerializer, entry.getRevision().get() + 1)); + getLogger().debug("Entry with key " + key + " was updated during replace, with revision " + entry.getRevision().get() + 1); + return true; + } + } + + return false; + } + + @Override + public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + return cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_VERSION)) == null; + } + + @Override + public <K, V> V getAndPutIfAbsent( + final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer + ) throws IOException { + final byte[] result = cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_VERSION)); + return (result == null) ? null : parsePayload(valueDeserializer, result); + } + + @Override + public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { + return cache.contains(getCacheEntryKey(key, keySerializer)); + } + + @Override + public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + cache.put(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_VERSION)); + } + + @Override + public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); + return result == null ? null : parsePayload(valueDeserializer, result); + } + + @Override + public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException { + return cache.remove(getCacheEntryKey(key, keySerializer)); + } + + @Override + public long removeByPattern(final String regex) throws IOException { + return cache.removeAll(new RegexPredicate(regex)); + } + + private static class RegexPredicate implements Predicate<String>, Serializable { + private final Pattern pattern; + + private RegexPredicate(final String regex) { + this.pattern = Pattern.compile(regex); + } + + @Override + public boolean test(final String string) { + return pattern.matcher(string).matches(); + } + } + + @Override + public void close() throws IOException { + getLogger().debug("Closing " + this.getClass().getSimpleName()); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + private static long parseVersion(final byte[] value) { + return LongUtil.fromPaddedBytes(Arrays.copyOfRange(value, 0, Long.BYTES)); + } + + private static <V> V parsePayload(final Deserializer<V> deserializer, final byte[] value) throws IOException { + return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length)); + } + + private <S> String getCacheEntryKey(final S value, final Serializer<S> serializer) throws IOException { + final String result; + + if (value instanceof String) { + result = (String) value; + } else { + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + serializer.serialize(value, stream); + result = stream.toString("UTF-8"); + } + + if (result.isEmpty()) { + throw new IOException("Cache record key cannot be empty!"); + } + + return result; + } + + /** + * Serializes incoming value using the given serializer. The end result is a byte array might be parsed by this + * implementation. As a convention, the first eight bytes of the array contains a long value, serves as version + * identifier. From the ninth byte, the actual value starts. During parsing this, the version must be read every + * time as convention. Review comment: ```suggestion * Serializes a value using the given serializer. The first eight bytes of the array contains the revision. * The rest holds the actual serialized value. ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/IMapBasedHazelcastCacheManager.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cache.IMapBasedHazelcastCache; +import org.apache.nifi.reporting.InitializationException; + +abstract class IMapBasedHazelcastCacheManager extends AbstractControllerService implements HazelcastCacheManager { + protected static final String ADDRESS_SEPARATOR = ","; + + public static final PropertyDescriptor HAZELCAST_CLUSTER_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cluster-name") + .displayName("Hazelcast Cluster Name") + .description("Name of the Hazelcast instance's cluster.") + .defaultValue("nifi") // Hazelcast's default is "dev", "nifi" overwrites this. + .required(false) + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private volatile HazelcastInstance instance; + + @Override + public HazelcastCache getCache(final String name, final long ttlInMillis) { + return new IMapBasedHazelcastCache(name, ttlInMillis, instance.getMap(name)); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + try { + instance = getInstance(context); + } catch (final Exception e) { + getLogger().error("Could not initialize Hazelcast connection. Reason: " + e.getMessage(), e); Review comment: ```suggestion getLogger().error("Could not create Hazelcast instance. Reason: " + e.getMessage(), e); ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. Review comment: ```suggestion * Note: By design, the client should not directly depend on Hazelcast specific classes to allow easy version and implementation changes. ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") Review comment: ```suggestion .displayName("Hazelcast Entry Lifetime") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") Review comment: ```suggestion .description("A Hazelcast Cache Manager which manages connections to Hazelcast and provides cache instances") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cache; + +import com.hazelcast.map.IMap; +import com.hazelcast.map.ReachedMaxSizeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +/** + * Implementation of {@link HazelcastCache} backed by Hazelcast's IMap data structure. It's purpose is to wrap Hazelcast implementation specific details in order to + * make it possible to easily change version or data structure. + */ +public class IMapBasedHazelcastCache implements HazelcastCache { + private static final Logger LOGGER = LoggerFactory.getLogger(IMapBasedHazelcastCache.class); + + private final String name; + private final long ttlInMillis; + private final IMap<String, byte[]> repository; + + /** + * @param name Name of the cache stored for identification. + * @param ttlInMillis The guaranteed lifetime of a cache entry in milliseconds. + * @param repository Reference to the actual storage. It should be the IMap with the same identifier as cache name. + */ + public IMapBasedHazelcastCache( + final String name, + final long ttlInMillis, + final IMap<String, byte[]> repository) { + this.name = name; + this.ttlInMillis = ttlInMillis; + this.repository = repository; + } + + @Override + public String name() { + return name; + } + + @Override + public byte[] get(final String key) { + return repository.get(key); + } + + @Override + public byte[] putIfAbsent(final String key, final byte[] value) { + return repository.putIfAbsent(key, value, ttlInMillis, TimeUnit.MILLISECONDS); + } + + @Override + public boolean put(final String key, final byte[] value) { + try { + repository.put(key, value, ttlInMillis, TimeUnit.MILLISECONDS); + return true; + } catch (final ReachedMaxSizeException e) { + LOGGER.error("Cache {} reached the maximum allowed size!", name); + return false; + } + } + + @Override + public boolean contains(final String key) { + return repository.containsKey(key); + } + + @Override + public boolean remove(final String key) { + return repository.remove(key) != null; + } + + @Override + public int removeAll(final Predicate<String> keyMatcher) { + // Note: the Hazelcast IMap provides support for predicate based <code>removeAll</code> method, but it neither atomic or provides information about the number of deleted items. + final Set<String> keys = repository.keySet(); + int result = 0; + + for (final String key : keys) { + if (keyMatcher.test(key)) { + repository.remove(key); Review comment: ```suggestion repository.delete(key); ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") + .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" + + "will exists until it's deletion or until the Hazelcast server is shut down.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden from a different client. Review comment: ```suggestion .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden by a different client. ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") +public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager { + + private static final int DEFAULT_HAZELCAST_PORT = 5701; + + private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + + " every node has access only the data stored within that node."); + private static final AllowableValue HA_CLUSTER = new AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the NiFi cluster:" + + " It expects every NiFi nodes to have a running Hazelcast instance on the same port as specified in the Hazelcast Port property. No explicit listing of the" + + " instances is needed."); + private static final AllowableValue HA_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," + + " creating cluster using those. This provides greater control over the used servers, making it possible to utilize only a number of nodes as Hazelcast server." + + " The list of Hazelcast instances takes place in property \"Hazelcast Instances\"."); + + private static final PropertyDescriptor HAZELCAST_PORT = new PropertyDescriptor.Builder() + .name("hazelcast-port") + .displayName("Hazelcast Port") + .description("Port the Hazelcast uses as starting port. If not specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT + ".") + .required(false) + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private static final PropertyDescriptor HAZELCAST_HA_MODE = new PropertyDescriptor.Builder() + .name("hazelcast-ha-mode") + .displayName("Hazelcast High Availability Mode") + .description("Specifies in what strategy the Hazelcast cluster should be created.") + .required(true) + .allowableValues(HA_NONE, HA_CLUSTER, HA_EXPLICIT) + .defaultValue(HA_NONE.getValue()) // None is used for default in order to be valid with standalone NiFi. + .build(); + + private static final PropertyDescriptor HAZELCAST_INSTANCES = new PropertyDescriptor.Builder() + .name("hazelcast-instances") + .displayName("Hazelcast Instances") + .description("List of Hazelcast instances should be part of the cluster, using {host:port} format separated the instances by comma." + + " Only used when high availability mode is set to \"Explicit\". The list must contain every instances will be part of the cluster.") + .required(false) + .addValidator(StandardValidators.URI_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; + + static { + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + HAZELCAST_CLUSTER_NAME, + HAZELCAST_PORT, + HAZELCAST_HA_MODE, + HAZELCAST_INSTANCES + )); + } + + @Override + protected HazelcastInstance getInstance(final ConfigurationContext context) { + final String instanceName = UUID.randomUUID().toString(); + final Config config = new Config(instanceName); + final NetworkConfig networkConfig = config.getNetworkConfig(); + final TcpIpConfig tcpIpConfig = networkConfig.getJoin().getTcpIpConfig(); + final String haMode = context.getProperty(HAZELCAST_HA_MODE).getValue(); + + if (context.getProperty(HAZELCAST_CLUSTER_NAME).isSet()) { + config.setClusterName(context.getProperty(HAZELCAST_CLUSTER_NAME).evaluateAttributeExpressions().getValue()); + } + + final int port = context.getProperty(HAZELCAST_PORT).isSet() + ? context.getProperty(HAZELCAST_PORT).evaluateAttributeExpressions().asInteger() + : DEFAULT_HAZELCAST_PORT; Review comment: ```suggestion final int port = context.getProperty(HAZELCAST_PORT).evaluateAttributeExpressions().asInteger(); ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") +public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager { + + private static final int DEFAULT_HAZELCAST_PORT = 5701; + + private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + + " every node has access only the data stored within that node."); + private static final AllowableValue HA_CLUSTER = new AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the NiFi cluster:" + + " It expects every NiFi nodes to have a running Hazelcast instance on the same port as specified in the Hazelcast Port property. No explicit listing of the" + + " instances is needed."); + private static final AllowableValue HA_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," + + " creating cluster using those. This provides greater control over the used servers, making it possible to utilize only a number of nodes as Hazelcast server." + + " The list of Hazelcast instances takes place in property \"Hazelcast Instances\"."); + + private static final PropertyDescriptor HAZELCAST_PORT = new PropertyDescriptor.Builder() + .name("hazelcast-port") + .displayName("Hazelcast Port") + .description("Port the Hazelcast uses as starting port. If not specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT + ".") + .required(false) + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private static final PropertyDescriptor HAZELCAST_HA_MODE = new PropertyDescriptor.Builder() + .name("hazelcast-ha-mode") + .displayName("Hazelcast High Availability Mode") + .description("Specifies in what strategy the Hazelcast cluster should be created.") Review comment: ```suggestion .description("Specifies with what strategy the Hazelcast cluster should be created.") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") + .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" + + "will exists until it's deletion or until the Hazelcast server is shut down.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden from a different client. + .build(); + + private static final long STARTING_VERSION = 1; Review comment: ```suggestion private static final long STARTING_REVISION = 1; ``` In general for consistency's sake `revision` could be better than `version` in other places as well. ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") +public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager { + + private static final int DEFAULT_HAZELCAST_PORT = 5701; + + private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + + " every node has access only the data stored within that node."); + private static final AllowableValue HA_CLUSTER = new AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the NiFi cluster:" + + " It expects every NiFi nodes to have a running Hazelcast instance on the same port as specified in the Hazelcast Port property. No explicit listing of the" + + " instances is needed."); + private static final AllowableValue HA_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," + + " creating cluster using those. This provides greater control over the used servers, making it possible to utilize only a number of nodes as Hazelcast server." + + " The list of Hazelcast instances takes place in property \"Hazelcast Instances\"."); + + private static final PropertyDescriptor HAZELCAST_PORT = new PropertyDescriptor.Builder() + .name("hazelcast-port") + .displayName("Hazelcast Port") + .description("Port the Hazelcast uses as starting port. If not specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT + ".") + .required(false) Review comment: ```suggestion .required(true) .defaultValue(DEFAULT_HAZELCAST_PORT) ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") + .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" + + "will exists until it's deletion or until the Hazelcast server is shut down.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden from a different client. + .build(); + + private static final long STARTING_VERSION = 1; + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HAZELCAST_CACHE_MANAGER); + properties.add(HAZELCAST_CACHE_NAME); + properties.add(HAZELCAST_ENTRY_TTL); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties); + } + + private volatile HazelcastCache cache = null; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final HazelcastCacheManager hazelcastCacheManager = context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class); + cache = hazelcastCacheManager.getCache( + context.getProperty(HAZELCAST_CACHE_NAME).getValue(), + context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS)); + getLogger().debug("Enable Hazelcast cache client for cache " + cache.name()); + } + + @OnDisabled + public void onDisabled() { + // The cache state will be preserved until the Service is not stopped! + getLogger().debug("Disable Hazelcast cache client for cache " + cache.name()); + cache = null; + } + + @Override + public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); + return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseVersion(result)); + } + + @Override + public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + if (entry.getKey() == null) { + return false; + } + + final String key = getCacheEntryKey(entry.getKey(), keySerializer); + + try(final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) { + final byte[] oldValue = cache.get(key); + + if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_VERSION)) { + cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_VERSION)); + getLogger().debug("Entry with key " + key + " was added during replace"); + return true; + } else if (oldValue != null && entry.getRevision().get() == parseVersion(oldValue)) { + cache.put(key, serialize(entry.getValue(), valueSerializer, entry.getRevision().get() + 1)); + getLogger().debug("Entry with key " + key + " was updated during replace, with revision " + entry.getRevision().get() + 1); + return true; + } + } + + return false; + } + + @Override + public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + return cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_VERSION)) == null; + } + + @Override + public <K, V> V getAndPutIfAbsent( + final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer + ) throws IOException { + final byte[] result = cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_VERSION)); + return (result == null) ? null : parsePayload(valueDeserializer, result); + } + + @Override + public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { + return cache.contains(getCacheEntryKey(key, keySerializer)); + } + + @Override + public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + cache.put(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_VERSION)); + } + + @Override + public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); + return result == null ? null : parsePayload(valueDeserializer, result); + } + + @Override + public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException { + return cache.remove(getCacheEntryKey(key, keySerializer)); + } + + @Override + public long removeByPattern(final String regex) throws IOException { + return cache.removeAll(new RegexPredicate(regex)); + } + + private static class RegexPredicate implements Predicate<String>, Serializable { + private final Pattern pattern; + + private RegexPredicate(final String regex) { + this.pattern = Pattern.compile(regex); + } + + @Override + public boolean test(final String string) { + return pattern.matcher(string).matches(); + } + } + + @Override + public void close() throws IOException { + getLogger().debug("Closing " + this.getClass().getSimpleName()); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + private static long parseVersion(final byte[] value) { + return LongUtil.fromPaddedBytes(Arrays.copyOfRange(value, 0, Long.BYTES)); + } + + private static <V> V parsePayload(final Deserializer<V> deserializer, final byte[] value) throws IOException { + return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length)); + } + + private <S> String getCacheEntryKey(final S value, final Serializer<S> serializer) throws IOException { Review comment: ```suggestion private <S> String getCacheEntryKey(final S key, final Serializer<S> serializer) throws IOException { ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") + .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" + + "will exists until it's deletion or until the Hazelcast server is shut down.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden from a different client. + .build(); + + private static final long STARTING_VERSION = 1; + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HAZELCAST_CACHE_MANAGER); + properties.add(HAZELCAST_CACHE_NAME); + properties.add(HAZELCAST_ENTRY_TTL); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties); + } + + private volatile HazelcastCache cache = null; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final HazelcastCacheManager hazelcastCacheManager = context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class); + cache = hazelcastCacheManager.getCache( + context.getProperty(HAZELCAST_CACHE_NAME).getValue(), + context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS)); + getLogger().debug("Enable Hazelcast cache client for cache " + cache.name()); + } + + @OnDisabled + public void onDisabled() { + // The cache state will be preserved until the Service is not stopped! + getLogger().debug("Disable Hazelcast cache client for cache " + cache.name()); + cache = null; + } + + @Override + public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); + return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseVersion(result)); + } + + @Override + public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + if (entry.getKey() == null) { + return false; + } + + final String key = getCacheEntryKey(entry.getKey(), keySerializer); + + try(final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) { + final byte[] oldValue = cache.get(key); + + if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_VERSION)) { + cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_VERSION)); + getLogger().debug("Entry with key " + key + " was added during replace"); + return true; + } else if (oldValue != null && entry.getRevision().get() == parseVersion(oldValue)) { + cache.put(key, serialize(entry.getValue(), valueSerializer, entry.getRevision().get() + 1)); + getLogger().debug("Entry with key " + key + " was updated during replace, with revision " + entry.getRevision().get() + 1); Review comment: Not entirely sure but to my understanding, according to the interface, `replace` should _not_ change the revision. ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") + .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" + + "will exists until it's deletion or until the Hazelcast server is shut down.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden from a different client. + .build(); + + private static final long STARTING_VERSION = 1; + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HAZELCAST_CACHE_MANAGER); + properties.add(HAZELCAST_CACHE_NAME); + properties.add(HAZELCAST_ENTRY_TTL); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties); + } + + private volatile HazelcastCache cache = null; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final HazelcastCacheManager hazelcastCacheManager = context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class); + cache = hazelcastCacheManager.getCache( + context.getProperty(HAZELCAST_CACHE_NAME).getValue(), + context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS)); + getLogger().debug("Enable Hazelcast cache client for cache " + cache.name()); + } + + @OnDisabled + public void onDisabled() { + // The cache state will be preserved until the Service is not stopped! + getLogger().debug("Disable Hazelcast cache client for cache " + cache.name()); + cache = null; + } + + @Override + public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); + return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseVersion(result)); + } + + @Override + public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + if (entry.getKey() == null) { + return false; + } + + final String key = getCacheEntryKey(entry.getKey(), keySerializer); + + try(final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) { + final byte[] oldValue = cache.get(key); + + if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_VERSION)) { + cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_VERSION)); + getLogger().debug("Entry with key " + key + " was added during replace"); + return true; + } else if (oldValue != null && entry.getRevision().get() == parseVersion(oldValue)) { Review comment: ```suggestion } else if (oldValue != null && Objects.equals(entry.getRevision().get(), parseVersion(oldValue))) { ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/IMapBasedHazelcastCacheManager.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cache.IMapBasedHazelcastCache; +import org.apache.nifi.reporting.InitializationException; + +abstract class IMapBasedHazelcastCacheManager extends AbstractControllerService implements HazelcastCacheManager { + protected static final String ADDRESS_SEPARATOR = ","; + + public static final PropertyDescriptor HAZELCAST_CLUSTER_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cluster-name") + .displayName("Hazelcast Cluster Name") + .description("Name of the Hazelcast instance's cluster.") Review comment: ```suggestion .description("Name of the Hazelcast cluster.") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder() + .name("hazelcast-entry-ttl") + .displayName("Hazelcast entry TTL") + .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" + + "will exists until it's deletion or until the Hazelcast server is shut down.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden from a different client. + .build(); + + private static final long STARTING_VERSION = 1; + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HAZELCAST_CACHE_MANAGER); + properties.add(HAZELCAST_CACHE_NAME); + properties.add(HAZELCAST_ENTRY_TTL); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties); + } + + private volatile HazelcastCache cache = null; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final HazelcastCacheManager hazelcastCacheManager = context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class); + cache = hazelcastCacheManager.getCache( + context.getProperty(HAZELCAST_CACHE_NAME).getValue(), Review comment: ```suggestion context.getProperty(HAZELCAST_CACHE_NAME).evaluateAttributeExpressions().getValue(), ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") +public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager { + + private static final int DEFAULT_HAZELCAST_PORT = 5701; + + private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + + " every node has access only the data stored within that node."); + private static final AllowableValue HA_CLUSTER = new AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the NiFi cluster:" + + " It expects every NiFi nodes to have a running Hazelcast instance on the same port as specified in the Hazelcast Port property. No explicit listing of the" + + " instances is needed."); + private static final AllowableValue HA_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," + + " creating cluster using those. This provides greater control over the used servers, making it possible to utilize only a number of nodes as Hazelcast server." + + " The list of Hazelcast instances takes place in property \"Hazelcast Instances\"."); Review comment: ```suggestion private static final AllowableValue HA_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," + " creating a cluster using those. This provides greater control, making it possible to utilize only certain nodes as Hazelcast servers." + " The list of Hazelcast instances can be set in the property \"Hazelcast Instances\"."); ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cacheclient; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hazelcast.services.cache.HazelcastCache; +import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager; +import org.apache.nifi.hazelcast.services.util.LongUtil; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. + * + * Note: By design, the client should not directly depend on Hazelcast specific classes due to ease version and implementation changes. + */ +@Tags({ "hazelcast", "cache", "map"}) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + + "an abstracted repository manages the actual Hazelcast calls, provided by HazelcastConnectionService.") +public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { + + public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder() + .name("hazelcast-cache-manager") + .displayName("Hazelcast Cache Manager") + .description("A Hazelcast Cache Manager which manages connections to Hazelcast and providing cache instances") + .identifiesControllerService(HazelcastCacheManager.class) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder() + .name("hazelcast-cache-name") + .displayName("Hazelcast Cache Name") + .description("The name of a given repository. Within a Hazelcast cluster, multiple unrelated caches might be used." + + "Clients using the same cache name will depend on the same data structure.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) Review comment: ```suggestion .addValidator(StandardValidators.NON_BLANK_VALIDATOR) ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") +public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager { + + private static final int DEFAULT_HAZELCAST_PORT = 5701; + + private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + + " every node has access only the data stored within that node."); + private static final AllowableValue HA_CLUSTER = new AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the NiFi cluster:" + + " It expects every NiFi nodes to have a running Hazelcast instance on the same port as specified in the Hazelcast Port property. No explicit listing of the" + + " instances is needed."); + private static final AllowableValue HA_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," + + " creating cluster using those. This provides greater control over the used servers, making it possible to utilize only a number of nodes as Hazelcast server." + + " The list of Hazelcast instances takes place in property \"Hazelcast Instances\"."); + + private static final PropertyDescriptor HAZELCAST_PORT = new PropertyDescriptor.Builder() + .name("hazelcast-port") + .displayName("Hazelcast Port") + .description("Port the Hazelcast uses as starting port. If not specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT + ".") + .required(false) + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private static final PropertyDescriptor HAZELCAST_HA_MODE = new PropertyDescriptor.Builder() + .name("hazelcast-ha-mode") + .displayName("Hazelcast High Availability Mode") + .description("Specifies in what strategy the Hazelcast cluster should be created.") + .required(true) + .allowableValues(HA_NONE, HA_CLUSTER, HA_EXPLICIT) + .defaultValue(HA_NONE.getValue()) // None is used for default in order to be valid with standalone NiFi. + .build(); + + private static final PropertyDescriptor HAZELCAST_INSTANCES = new PropertyDescriptor.Builder() + .name("hazelcast-instances") + .displayName("Hazelcast Instances") + .description("List of Hazelcast instances should be part of the cluster, using {host:port} format separated the instances by comma." + + " Only used when high availability mode is set to \"Explicit\". The list must contain every instances will be part of the cluster.") Review comment: ```suggestion .description("List of Hazelcast instances that should be part of the cluster. Instances are using {host:port} format, separated by comma." + " Only used when high availability mode is set to \"Explicit\". The list must contain every instances that will be part of the cluster.") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") +public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager { + + private static final int DEFAULT_HAZELCAST_PORT = 5701; + + private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + + " every node has access only the data stored within that node."); + private static final AllowableValue HA_CLUSTER = new AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the NiFi cluster:" + + " It expects every NiFi nodes to have a running Hazelcast instance on the same port as specified in the Hazelcast Port property. No explicit listing of the" + + " instances is needed."); + private static final AllowableValue HA_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," + + " creating cluster using those. This provides greater control over the used servers, making it possible to utilize only a number of nodes as Hazelcast server." + + " The list of Hazelcast instances takes place in property \"Hazelcast Instances\"."); + + private static final PropertyDescriptor HAZELCAST_PORT = new PropertyDescriptor.Builder() + .name("hazelcast-port") + .displayName("Hazelcast Port") + .description("Port the Hazelcast uses as starting port. If not specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT + ".") Review comment: ```suggestion .description("Port for the Hazelcast instance to use. If not specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT + ".") ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") +public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager { + + private static final int DEFAULT_HAZELCAST_PORT = 5701; + + private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + + " every node has access only the data stored within that node."); + private static final AllowableValue HA_CLUSTER = new AllowableValue("cluster", "Cluster", "Creates Hazelcast cluster based on the NiFi cluster:" + + " It expects every NiFi nodes to have a running Hazelcast instance on the same port as specified in the Hazelcast Port property. No explicit listing of the" + + " instances is needed."); + private static final AllowableValue HA_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," + + " creating cluster using those. This provides greater control over the used servers, making it possible to utilize only a number of nodes as Hazelcast server." + + " The list of Hazelcast instances takes place in property \"Hazelcast Instances\"."); + + private static final PropertyDescriptor HAZELCAST_PORT = new PropertyDescriptor.Builder() + .name("hazelcast-port") + .displayName("Hazelcast Port") + .description("Port the Hazelcast uses as starting port. If not specified, the default value will be used, which is " + DEFAULT_HAZELCAST_PORT + ".") + .required(false) + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private static final PropertyDescriptor HAZELCAST_HA_MODE = new PropertyDescriptor.Builder() + .name("hazelcast-ha-mode") + .displayName("Hazelcast High Availability Mode") + .description("Specifies in what strategy the Hazelcast cluster should be created.") + .required(true) + .allowableValues(HA_NONE, HA_CLUSTER, HA_EXPLICIT) + .defaultValue(HA_NONE.getValue()) // None is used for default in order to be valid with standalone NiFi. + .build(); + + private static final PropertyDescriptor HAZELCAST_INSTANCES = new PropertyDescriptor.Builder() + .name("hazelcast-instances") + .displayName("Hazelcast Instances") + .description("List of Hazelcast instances should be part of the cluster, using {host:port} format separated the instances by comma." + + " Only used when high availability mode is set to \"Explicit\". The list must contain every instances will be part of the cluster.") + .required(false) + .addValidator(StandardValidators.URI_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; + + static { + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + HAZELCAST_CLUSTER_NAME, + HAZELCAST_PORT, + HAZELCAST_HA_MODE, + HAZELCAST_INSTANCES + )); + } + + @Override + protected HazelcastInstance getInstance(final ConfigurationContext context) { + final String instanceName = UUID.randomUUID().toString(); + final Config config = new Config(instanceName); + final NetworkConfig networkConfig = config.getNetworkConfig(); + final TcpIpConfig tcpIpConfig = networkConfig.getJoin().getTcpIpConfig(); + final String haMode = context.getProperty(HAZELCAST_HA_MODE).getValue(); + + if (context.getProperty(HAZELCAST_CLUSTER_NAME).isSet()) { + config.setClusterName(context.getProperty(HAZELCAST_CLUSTER_NAME).evaluateAttributeExpressions().getValue()); + } + + final int port = context.getProperty(HAZELCAST_PORT).isSet() + ? context.getProperty(HAZELCAST_PORT).evaluateAttributeExpressions().asInteger() + : DEFAULT_HAZELCAST_PORT; + + // It high availability is turned off, we turn of the capability for the Hazelcast instance to form a cluster. Review comment: ```suggestion // It high availability is turned off, we turn off the capability of the Hazelcast instance to form a cluster. ``` ########## File path: nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hazelcast.services.cachemanager; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NetworkConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +@Tags({"hazelcast", "cache"}) +@CapabilityDescription("A service that provides connections to an embedded Hazelcast instance started by NiFi." + + " The server does not asks for authentication, it is suggested to run it within secured network.") +public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager { + + private static final int DEFAULT_HAZELCAST_PORT = 5701; + + private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + + " every node has access only the data stored within that node."); Review comment: ```suggestion private static final AllowableValue HA_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," + " every node has access only to the data stored locally."); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
