[ 
https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433987#comment-16433987
 ] 

ASF GitHub Bot commented on NIFI-4982:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2558#discussion_r180771987
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java
 ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.lookup;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
    +import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +
    +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"})
    +@CapabilityDescription("Allows to choose a distributed map cache client to 
retrieve the value associated to a key. "
    +    + "The coordinates that are passed to the lookup must contain the key 
'key'.")
    +public class DistributedMapCacheLookupService extends 
AbstractControllerService implements StringLookupService {
    +    private static final String KEY = "key";
    +    private static final Set<String> REQUIRED_KEYS = 
Stream.of(KEY).collect(Collectors.toSet());
    +
    +    private volatile DistributedMapCacheClient cache;
    +    private final Serializer<String> keySerializer = new 
StringSerializer();
    +    private final Deserializer<String> valueDeserializer = new 
StringDeserializer();
    +
    +    public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE 
= new PropertyDescriptor.Builder()
    +            .name("distributed-map-cache-service")
    +            .displayName("Distributed Cache Service")
    +            .description("The Controller Service that is used to get the 
cached values.")
    +            .required(true)
    +            .identifiesControllerService(DistributedMapCacheClient.class)
    +            .build();
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .name(propertyDescriptorName)
    +            .required(false)
    +            .dynamic(true)
    +            .addValidator(Validator.VALID)
    +            .expressionLanguageSupported(true)
    +            .build();
    +    }
    +
    +    @OnEnabled
    +    public void cacheConfiguredValues(final ConfigurationContext context) {
    +        cache = 
context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE);
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Optional<String> lookup(final Map<String, Object> coordinates) {
    +        if (coordinates == null) {
    +            return Optional.empty();
    +        }
    +
    +        final String key = coordinates.get(KEY).toString();
    +        if (key == null) {
    +            return Optional.empty();
    +        }
    +
    +        try {
    +            return Optional.ofNullable(cache.get(key, keySerializer, 
valueDeserializer));
    +        } catch (IOException e) {
    +            getLogger().error("Error while trying to get the value from 
distributed map cache with key = " + key, e);
    +            return Optional.empty();
    +        }
    +    }
    +
    +    @Override
    +    public Set<String> getRequiredKeys() {
    +        return REQUIRED_KEYS;
    +    }
    +
    +    public static class StringDeserializer implements Deserializer<String> 
{
    --- End diff --
    
    Should these be moved up to a utilities class or API package?  Also, do we 
support character sets other than UTF-8?


> Add a DistributedMapCacheLookupService
> --------------------------------------
>
>                 Key: NIFI-4982
>                 URL: https://issues.apache.org/jira/browse/NIFI-4982
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Pierre Villard
>            Assignee: Pierre Villard
>            Priority: Major
>         Attachments: distributedMapCacheLookup.xml
>
>
> Add a new lookup controller service that takes a Distributed Map Cache client 
> to lookup for values based on a key. This allows users to leverage the 
> internal Disitributed Map Cache server and/or the Redis to perform lookup 
> access (with LookupRecord processor for instance).
> Attached is a template to test the PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to