[ 
https://issues.apache.org/jira/browse/NIFI-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16454231#comment-16454231
 ] 

ASF GitHub Bot commented on NIFI-5051:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2615#discussion_r184394354
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
 ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.lookup.LookupFailureException;
    +import org.apache.nifi.lookup.LookupService;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.SchemaIdentifier;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +public class ElasticSearchLookupService extends AbstractControllerService 
implements LookupService {
    +    public static final PropertyDescriptor CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
    +        .name("el-rest-client-service")
    +        .displayName("Client Service")
    +        .description("An ElasticSearch client service to use for running 
queries.")
    +        .identifiesControllerService(ElasticSearchClientService.class)
    +        .required(true)
    +        .build();
    +    public static final PropertyDescriptor INDEX = new 
PropertyDescriptor.Builder()
    +        .name("el-lookup-index")
    +        .displayName("Index")
    +        .description("The name of the index to read from")
    +        .required(true)
    +        
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TYPE = new 
PropertyDescriptor.Builder()
    +        .name("el-lookup-type")
    +        .displayName("Type")
    +        .description("The type of this document (used by Elasticsearch for 
indexing and searching)")
    +        .required(false)
    +        
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor SCHEMA_REGISTRY = new 
PropertyDescriptor.Builder()
    +        .name("el-lookup-schema-registry")
    +        .displayName("Schema Registry")
    +        .description("If specified, this avro schema will be used for all 
objects loaded from MongoDB using this service. If left blank, " +
    +                "the service will attempt to determine the schema from the 
results.")
    +        .required(false)
    +        .identifiesControllerService(SchemaRegistry.class)
    +        .build();
    +
    +    public static final PropertyDescriptor RECORD_SCHEMA_NAME = new 
PropertyDescriptor.Builder()
    +        .name("el-lookup-record-schema-name")
    +        .displayName("Record Schema Name")
    +        .description("If specified, the value will be used to lookup a 
schema in the configured schema registry.")
    +        .required(false)
    +        .addValidator(Validator.VALID)
    +        .build();
    +
    +    static final List<PropertyDescriptor> lookupDescriptors;
    +
    +    static {
    +        List<PropertyDescriptor> _desc = new ArrayList<>();
    +        _desc.add(CLIENT_SERVICE);
    +        _desc.add(INDEX);
    +        _desc.add(TYPE);
    +        _desc.add(SCHEMA_REGISTRY);
    +        _desc.add(RECORD_SCHEMA_NAME);
    +
    +        lookupDescriptors = Collections.unmodifiableList(_desc);
    +    }
    +
    +    private ElasticSearchClientService clientService;
    +
    +    private String index;
    +    private String type;
    +    private ObjectMapper mapper;
    +    private RecordSchema recordSchema;
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    +        List<ValidationResult> problems = new ArrayList<>();
    +
    +        PropertyValue registry = 
validationContext.getProperty(SCHEMA_REGISTRY);
    +        PropertyValue schemaName = 
validationContext.getProperty(RECORD_SCHEMA_NAME);
    +
    +        if (registry.isSet() && !schemaName.isSet()) {
    +            problems.add(new ValidationResult.Builder()
    +                    .explanation("If the registry is set, the schema name 
parameter must be set too.")
    +                    .build());
    +        } else if (!registry.isSet() && schemaName.isSet()) {
    +            problems.add(new ValidationResult.Builder()
    +                    .explanation("If the schema name is set, the schema 
registry parameter must be set too.")
    +                    .build());
    +        }
    +
    +        return problems;
    +    }
    +
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
    +        clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +        index = context.getProperty(INDEX).getValue();
    +        type  = context.getProperty(TYPE).getValue();
    +        mapper = new ObjectMapper();
    +
    +        SchemaRegistry registry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +        final String name = 
context.getProperty(RECORD_SCHEMA_NAME).getValue();
    +        if (registry != null) {
    +            try {
    +                SchemaIdentifier identifier = SchemaIdentifier.builder()
    +                    .name(name)
    +                    .build();
    +                recordSchema = registry.retrieveSchema(identifier);
    +            } catch (Exception ex) {
    +                getLogger().error(String.format("Could not find schema 
named %s", name), ex);
    +                throw new InitializationException(ex);
    +            }
    +        }
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return lookupDescriptors;
    +    }
    +
    +    @Override
    +    public Optional lookup(Map coordinates) throws LookupFailureException {
    +        validateCoordinates(coordinates);
    +
    +        try {
    +            Record record;
    +            if (coordinates.containsKey("_id")) {
    +                record = getById((String)coordinates.get("_id"));
    +            } else {
    +                record = getByQuery((String)coordinates.get("query"));
    +            }
    +
    +            return record == null ? Optional.empty() : Optional.of(record);
    +        } catch (IOException ex) {
    +            getLogger().error("Error during lookup.", ex);
    +            throw new LookupFailureException(ex);
    +        }
    +    }
    +
    +    private void validateCoordinates(Map coordinates) throws 
LookupFailureException {
    +        List<String> reasons = new ArrayList<>();
    +
    +        if (coordinates.containsKey("_id") && !(coordinates.get("_id") 
instanceof String)) {
    +            reasons.add("_id was supplied, but it was not a String.");
    +        } else if (coordinates.containsKey("query") && 
!(coordinates.get("query") instanceof String)) {
    +            reasons.add("query was supplied, but it was not a String.");
    +        } else if (!coordinates.containsKey("_id") && 
!coordinates.containsKey("query")) {
    +            reasons.add("Either \"_id\" or \"query\" must be supplied as 
keys to lookup(Map)");
    +        } else if (coordinates.containsKey("_id") && 
coordinates.containsKey("query")) {
    +            reasons.add("\"_id\" and \"query\" cannot be used at the same 
time as keys.");
    +        }
    +
    +        if (reasons.size() > 0) {
    +            String error = String.join("\n", reasons);
    +            throw new LookupFailureException(error);
    +        }
    +    }
    +
    +    private Record getById(final String _id) throws IOException, 
LookupFailureException {
    +        Map<String, Object> query = new HashMap<String, Object>(){{
    +            put("query", new HashMap<String, Object>() {{
    +                put("match", new HashMap<String, String>(){{
    +                    put("_id", _id);
    +                }});
    +            }});
    +        }};
    +
    +        String json = mapper.writeValueAsString(query);
    +
    +        SearchResponse response = clientService.search(json, index, type);
    +
    +        if (response.getNumberOfHits() > 1) {
    +            throw new LookupFailureException(String.format("Expected 1 
response, got %d for query %s",
    +                response.getNumberOfHits(), json));
    +        } else if (response.getNumberOfHits() == 0) {
    +            return null;
    +        }
    +
    +        final Map<String, Object> source = 
(Map)response.getHits().get(0).get("_source");
    +
    +        RecordSchema toUse = recordSchema != null ? recordSchema : 
convertSchema(source);
    +
    +        return new MapRecord(toUse, source);
    +    }
    +
    +    private Record getByQuery(final String query) throws 
LookupFailureException {
    +        Map<String, Object> parsed;
    +        try {
    +            parsed = mapper.readValue(query, Map.class);
    +            parsed.remove("from");
    +            parsed.remove("aggs");
    +            parsed.put("size", 1);
    +
    +            final String json = mapper.writeValueAsString(parsed);
    +
    +            SearchResponse response = clientService.search(json, index, 
type);
    +
    +            if (response.getNumberOfHits() == 0) {
    +                return null;
    +            } else {
    +                final Map<String, Object> source = 
(Map)response.getHits().get(0).get("_source");
    +                RecordSchema toUse = recordSchema != null ? recordSchema : 
convertSchema(source);
    +                return new MapRecord(toUse, source);
    +            }
    +
    +        } catch (IOException e) {
    +            throw new LookupFailureException(e);
    +        }
    +    }
    +
    +    private RecordSchema convertSchema(Map<String, Object> result) {
    --- End diff --
    
    Should this be in a utilities class (if there isn't already such a method 
in one)? Seems pretty helpful for JSON-to-schema conversions (or any 
Map-to-Schema) in general.


> Create a LookupService that uses ElasticSearch
> ----------------------------------------------
>
>                 Key: NIFI-5051
>                 URL: https://issues.apache.org/jira/browse/NIFI-5051
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Mike Thomsen
>            Assignee: Mike Thomsen
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to