Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/733#discussion_r79827737 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Response; +import okhttp3.ResponseBody; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({ "elasticsearch", "query", "read", "get", "http" }) +@CapabilityDescription("Queries Elasticsearch using the specified connection properties. " + + "Note that the full body of each page of documents will be read into memory before being " + + "written to Flow Files for transfer. Also note that the Elasticsearch max_result_window index " + + "setting is the upper bound on the number of records that can be retrieved using this query. " + + "To retrieve more records, use the ScrollElasticsearchHttp processor.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), + @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), + @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"), + @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of " + + "each result will be placed into corresponding attributes with this prefix.") }) +public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + + private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + private static final String QUERY_QUERY_PARAM = "q"; + private static final String SORT_QUERY_PARAM = "sort"; + private static final String FROM_QUERY_PARAM = "from"; + private static final String SIZE_QUERY_PARAM = "size"; + + public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content"; + public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes"; + private static final String ATTRIBUTE_PREFIX = "es.result."; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description( + "All FlowFiles that are read from Elasticsearch are routed to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming " + + "flow files will be routed to failure.").build(); + + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description( + "A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may " + + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship " + + "based on the processor properties and the results of the fetch operation.") + .build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("query-es-query").displayName("Query") + .description("The Lucene-style query to run against ElasticSearch").required(true) + .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("query-es-index").displayName("Index") + .description("The name of the index to read from").required(true) + .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("query-es-type") + .displayName("Type") + .description( + "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set " + + "to _all, the first document matching the identifier across all types will be retrieved.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("query-es-fields") + .displayName("Fields") + .description( + "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, " + + "then the entire document's source will be retrieved.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() + .name("query-es-sort") + .displayName("Sort") + .description( + "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, " + + "then the results will be retrieved in document order.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("query-es-size").displayName("Page Size").defaultValue("20") + .description("Determines how many documents to return per page during scrolling.") + .required(true).expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build(); + + public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("query-es-limit").displayName("Limit") + .description("If set, limits the number of results that will be returned.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build(); + + public static final PropertyDescriptor TARGET = new PropertyDescriptor.Builder() + .name("query-es-target") + .displayName("Target") + .description( + "Indicates where the results should be placed. In the case of 'Flow file content', the JSON " + + "response will be written as the content of the flow file. In the case of 'Flow file attributes', " + + "the original flow file (if applicable) will be cloned for each result, and all return fields will be placed " + + "in a flow file attribute of the same name, but prefixed by 'es.result.'") + .required(true).expressionLanguageSupported(false) + .defaultValue(TARGET_FLOW_FILE_CONTENT) + .allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_RETRY); + return Collections.unmodifiableSet(relationships); + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(ES_URL); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(CONNECT_TIMEOUT); + descriptors.add(RESPONSE_TIMEOUT); + descriptors.add(QUERY); + descriptors.add(PAGE_SIZE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(FIELDS); + descriptors.add(SORT); + descriptors.add(LIMIT); + descriptors.add(TARGET); + + return Collections.unmodifiableList(descriptors); + } + + @OnScheduled + public void setup(ProcessContext context) { + super.setup(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) + throws ProcessException { + + FlowFile flowFile = null; + if (context.hasIncomingConnection()) { + flowFile = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can + // continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, + // then + // we know that we should run only if we have a FlowFile. + if (flowFile == null && context.hasNonLoopConnection()) { + return; + } + } + + OkHttpClient okHttpClient = getClient(); + + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile) + .getValue(); + final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile) + .getValue(); + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile) + .getValue(); + final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile) + .asInteger().intValue(); + final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT) + .evaluateAttributeExpressions(flowFile).asInteger().intValue() : null; + final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS) + .evaluateAttributeExpressions(flowFile).getValue() : null; + final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT) + .evaluateAttributeExpressions(flowFile).getValue() : null; + final boolean targetIsContent = context.getProperty(TARGET).getValue() + .equals(TARGET_FLOW_FILE_CONTENT); + + // Authentication + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + final ComponentLog logger = getLogger(); + + int fromIndex = 0; + int numResults; + + try { + logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType, + query }); + + final long startNanos = System.nanoTime(); + // read the url property from the context + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue()); + + boolean hitLimit = false; + do { + int mPageSize = pageSize; + if (limit != null && limit <= (fromIndex + pageSize)) { + mPageSize = limit - fromIndex; + hitLimit = true; + } + + final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, + mPageSize, fromIndex); + + final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, + username, password, "GET", null); + numResults = this.getPage(getResponse, queryUrl, context, session, flowFile, + logger, startNanos, targetIsContent); + fromIndex += pageSize; + } while (numResults > 0 && !hitLimit); + + if (flowFile != null) { + session.remove(flowFile); + } + } catch (IOException ioe) { + logger.error( + "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration " + + "(hosts, username/password, etc.). Routing to retry", + new Object[] { ioe.getLocalizedMessage() }, ioe); + if (flowFile != null) { + session.transfer(flowFile, REL_RETRY); + } + context.yield(); + + } catch (RetryableException e) { + logger.error(e.getMessage(), new Object[] { e.getLocalizedMessage() }, e); + if (flowFile != null) { + session.transfer(flowFile, REL_RETRY); + } + context.yield(); + } catch (Exception e) { + logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile, + e.getLocalizedMessage() }, e); + if (flowFile != null) { + session.transfer(flowFile, REL_FAILURE); + } + context.yield(); + } + } + + private int getPage(final Response getResponse, final URL url, final ProcessContext context, + final ProcessSession session, FlowFile flowFile, final ComponentLog logger, + final long startNanos, boolean targetIsContent) + throws IOException { + List<FlowFile> page = new ArrayList<>(); + final int statusCode = getResponse.code(); + + if (isSuccess(statusCode)) { + ResponseBody body = getResponse.body(); + final byte[] bodyBytes = body.bytes(); + JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); + JsonNode hits = responseJson.get("hits").get("hits"); + + for(int i = 0; i < hits.size(); i++) { + JsonNode hit = hits.get(i); + String retrievedId = hit.get("_id").asText(); + String retrievedIndex = hit.get("_index").asText(); + String retrievedType = hit.get("_type").asText(); + + FlowFile documentFlowFile = null; + if (flowFile != null) { + documentFlowFile = targetIsContent ? session.create(flowFile) : session.clone(flowFile); + } else { + documentFlowFile = session.create(); + } + + JsonNode source = hit.get("_source"); + documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex); + documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType); + + if (targetIsContent) { + documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId); + documentFlowFile = session.write(documentFlowFile, out -> { --- End diff -- Since _source is a JSON object, I recommend setting the mime.type attribute to application/json when the target is content.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---