Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/733#discussion_r79827737
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
 ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.codehaus.jackson.JsonNode;
    +
    +import okhttp3.HttpUrl;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +@SupportsBatching
    +@Tags({ "elasticsearch", "query", "read", "get", "http" })
    +@CapabilityDescription("Queries Elasticsearch using the specified 
connection properties. "
    +        + "Note that the full body of each page of documents will be read 
into memory before being "
    +        + "written to Flow Files for transfer.  Also note that the 
Elasticsearch max_result_window index "
    +        + "setting is the upper bound on the number of records that can be 
retrieved using this query.  "
    +        + "To retrieve more records, use the ScrollElasticsearchHttp 
processor.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "The 
filename attribute is set to the document identifier"),
    +        @WritesAttribute(attribute = "es.index", description = "The 
Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The 
Elasticsearch document type"),
    +        @WritesAttribute(attribute = "es.result.*", description = "If 
Target is 'Flow file attributes', the JSON attributes of "
    +                + "each result will be placed into corresponding 
attributes with this prefix.") })
    +public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
    +
    +    private static final String FIELD_INCLUDE_QUERY_PARAM = 
"_source_include";
    +    private static final String QUERY_QUERY_PARAM = "q";
    +    private static final String SORT_QUERY_PARAM = "sort";
    +    private static final String FROM_QUERY_PARAM = "from";
    +    private static final String SIZE_QUERY_PARAM = "size";
    +
    +    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file 
content";
    +    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file 
attributes";
    +    private static final String ATTRIBUTE_PREFIX = "es.result.";
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description(
    +                    "All FlowFiles that are read from Elasticsearch are 
routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be read from Elasticsearch 
are routed to this relationship. Note that only incoming "
    +                            + "flow files will be routed to 
failure.").build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description(
    +                    "A FlowFile is routed to this relationship if the 
document cannot be fetched but attempting the operation again may "
    +                            + "succeed. Note that if the processor has no 
incoming connections, flow files may still be sent to this relationship "
    +                            + "based on the processor properties and the 
results of the fetch operation.")
    +            .build();
    +
    +    public static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
    +            .name("query-es-query").displayName("Query")
    +            .description("The Lucene-style query to run against 
ElasticSearch").required(true)
    +            
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new 
PropertyDescriptor.Builder()
    +            .name("query-es-index").displayName("Index")
    +            .description("The name of the index to read 
from").required(true)
    +            
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new 
PropertyDescriptor.Builder()
    +            .name("query-es-type")
    +            .displayName("Type")
    +            .description(
    +                    "The (optional) type of this document, used by 
Elasticsearch for indexing and searching. If the property is empty or set "
    +                            + "to _all, the first document matching the 
identifier across all types will be retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor FIELDS = new 
PropertyDescriptor.Builder()
    +            .name("query-es-fields")
    +            .displayName("Fields")
    +            .description(
    +                    "A comma-separated list of fields to retrieve from the 
document. If the Fields property is left blank, "
    +                            + "then the entire document's source will be 
retrieved.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor SORT = new 
PropertyDescriptor.Builder()
    +            .name("query-es-sort")
    +            .displayName("Sort")
    +            .description(
    +                    "A sort parameter (e.g., timestamp:asc). If the Sort 
property is left blank, "
    +                            + "then the results will be retrieved in 
document order.")
    +            .required(false).expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("query-es-size").displayName("Page 
Size").defaultValue("20")
    +            .description("Determines how many documents to return per page 
during scrolling.")
    +            .required(true).expressionLanguageSupported(true)
    +            
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor LIMIT = new 
PropertyDescriptor.Builder()
    +            .name("query-es-limit").displayName("Limit")
    +            .description("If set, limits the number of results that will 
be returned.")
    +            .required(false).expressionLanguageSupported(true)
    +            
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    +
    +    public static final PropertyDescriptor TARGET = new 
PropertyDescriptor.Builder()
    +            .name("query-es-target")
    +            .displayName("Target")
    +            .description(
    +                    "Indicates where the results should be placed.  In the 
case of 'Flow file content', the JSON "
    +                            + "response will be written as the content of 
the flow file.  In the case of 'Flow file attributes', "
    +                            + "the original flow file (if applicable) will 
be cloned for each result, and all return fields will be placed "
    +                            + "in a flow file attribute of the same name, 
but prefixed by 'es.result.'")
    +            .required(true).expressionLanguageSupported(false)
    +            .defaultValue(TARGET_FLOW_FILE_CONTENT)
    +            .allowableValues(TARGET_FLOW_FILE_CONTENT, 
TARGET_FLOW_FILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_RETRY);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(ES_URL);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(CONNECT_TIMEOUT);
    +        descriptors.add(RESPONSE_TIMEOUT);
    +        descriptors.add(QUERY);
    +        descriptors.add(PAGE_SIZE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(FIELDS);
    +        descriptors.add(SORT);
    +        descriptors.add(LIMIT);
    +        descriptors.add(TARGET);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session)
    +            throws ProcessException {
    +
    +        FlowFile flowFile = null;
    +        if (context.hasIncomingConnection()) {
    +            flowFile = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are 
self-loops then we can
    +            // continue on.
    +            // However, if we have no FlowFile and we have connections 
coming from other Processors,
    +            // then
    +            // we know that we should run only if we have a FlowFile.
    +            if (flowFile == null && context.hasNonLoopConnection()) {
    +                return;
    +            }
    +        }
    +
    +        OkHttpClient okHttpClient = getClient();
    +
    +        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String query = 
context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final String docType = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
    +                .getValue();
    +        final int pageSize = 
context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
    +                .asInteger().intValue();
    +        final Integer limit = context.getProperty(LIMIT).isSet() ? 
context.getProperty(LIMIT)
    +                
.evaluateAttributeExpressions(flowFile).asInteger().intValue() : null;
    +        final String fields = context.getProperty(FIELDS).isSet() ? 
context.getProperty(FIELDS)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final String sort = context.getProperty(SORT).isSet() ? 
context.getProperty(SORT)
    +                .evaluateAttributeExpressions(flowFile).getValue() : null;
    +        final boolean targetIsContent = 
context.getProperty(TARGET).getValue()
    +                .equals(TARGET_FLOW_FILE_CONTENT);
    +
    +        // Authentication
    +        final String username = context.getProperty(USERNAME).getValue();
    +        final String password = context.getProperty(PASSWORD).getValue();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        int fromIndex = 0;
    +        int numResults;
    +
    +        try {
    +            logger.debug("Querying {}/{} from Elasticsearch: {}", new 
Object[] { index, docType,
    +                    query });
    +
    +            final long startNanos = System.nanoTime();
    +            // read the url property from the context
    +            final String urlstr = 
StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
    +
    +            boolean hitLimit = false;
    +            do {
    +                int mPageSize = pageSize;
    +                if (limit != null && limit <= (fromIndex + pageSize)) {
    +                    mPageSize = limit - fromIndex;
    +                    hitLimit = true;
    +                }
    +
    +                final URL queryUrl = buildRequestURL(urlstr, query, index, 
docType, fields, sort,
    +                        mPageSize, fromIndex);
    +
    +                final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, queryUrl,
    +                        username, password, "GET", null);
    +                numResults = this.getPage(getResponse, queryUrl, context, 
session, flowFile,
    +                        logger, startNanos, targetIsContent);
    +                fromIndex += pageSize;
    +            } while (numResults > 0 && !hitLimit);
    +
    +            if (flowFile != null) {
    +                session.remove(flowFile);
    +            }
    +        } catch (IOException ioe) {
    +            logger.error(
    +                    "Failed to read from Elasticsearch due to {}, this may 
indicate an error in configuration "
    +                            + "(hosts, username/password, etc.). Routing 
to retry",
    +                    new Object[] { ioe.getLocalizedMessage() }, ioe);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_RETRY);
    +            }
    +            context.yield();
    +
    +        } catch (RetryableException e) {
    +            logger.error(e.getMessage(), new Object[] { 
e.getLocalizedMessage() }, e);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_RETRY);
    +            }
    +            context.yield();
    +        } catch (Exception e) {
    +            logger.error("Failed to read {} from Elasticsearch due to {}", 
new Object[] { flowFile,
    +                    e.getLocalizedMessage() }, e);
    +            if (flowFile != null) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            }
    +            context.yield();
    +        }
    +    }
    +
    +    private int getPage(final Response getResponse, final URL url, final 
ProcessContext context,
    +            final ProcessSession session, FlowFile flowFile, final 
ComponentLog logger,
    +            final long startNanos, boolean targetIsContent)
    +            throws IOException {
    +        List<FlowFile> page = new ArrayList<>();
    +        final int statusCode = getResponse.code();
    +
    +        if (isSuccess(statusCode)) {
    +            ResponseBody body = getResponse.body();
    +            final byte[] bodyBytes = body.bytes();
    +            JsonNode responseJson = parseJsonResponse(new 
ByteArrayInputStream(bodyBytes));
    +            JsonNode hits = responseJson.get("hits").get("hits");
    +
    +            for(int i = 0; i < hits.size(); i++) {
    +                JsonNode hit = hits.get(i);
    +                String retrievedId = hit.get("_id").asText();
    +                String retrievedIndex = hit.get("_index").asText();
    +                String retrievedType = hit.get("_type").asText();
    +
    +                FlowFile documentFlowFile = null;
    +                if (flowFile != null) {
    +                    documentFlowFile = targetIsContent ? 
session.create(flowFile) : session.clone(flowFile);
    +                } else {
    +                    documentFlowFile = session.create();
    +                }
    +
    +                JsonNode source = hit.get("_source");
    +                documentFlowFile = session.putAttribute(documentFlowFile, 
"es.index", retrievedIndex);
    +                documentFlowFile = session.putAttribute(documentFlowFile, 
"es.type", retrievedType);
    +
    +                if (targetIsContent) {
    +                    documentFlowFile = 
session.putAttribute(documentFlowFile, "filename", retrievedId);
    +                    documentFlowFile = session.write(documentFlowFile, out 
-> {
    --- End diff --
    
    Since _source is a JSON object, I recommend setting the mime.type attribute 
to application/json when the target is content.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to