ChrisSamo632 commented on a change in pull request #5193: URL: https://github.com/apache/nifi/pull/5193#discussion_r676386467
########## File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor { + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") + .description("All original flowfiles that don't cause an error to occur go to this relationship.").build(); + public static final Relationship REL_HITS = new Relationship.Builder().name("hits") + .description("Search hits are routed to this relationship.") + .build(); + public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") + .description("Aggregations are routed to this relationship.") + .build(); + + public static final AllowableValue SPLIT_UP_YES = new AllowableValue( + "splitUp-yes", + "Yes", + "Split up results." + ); + public static final AllowableValue SPLIT_UP_NO = new AllowableValue( + "splitUp-no", + "No", + "Don't split up results." + ); + + public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder() + .name("el-rest-split-up-hits") + .displayName("Split up search results") + .description("Split up search results into one flowfile per result.") + .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES) + .defaultValue(SPLIT_UP_NO.getValue()) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder() + .name("el-rest-split-up-aggregations") + .displayName("Split up aggregation results") + .description("Split up aggregation results into one flowfile per result.") + .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES) + .defaultValue(SPLIT_UP_NO.getValue()) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + private static final Set<Relationship> relationships; + private static final List<PropertyDescriptor> propertyDescriptors; + + AtomicReference<ElasticSearchClientService> clientService; + String splitUpHits; + private String splitUpAggregations; + + final ObjectMapper mapper = new ObjectMapper(); + + static { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_ORIGINAL); + rels.add(REL_FAILURE); + rels.add(REL_HITS); + rels.add(REL_AGGREGATIONS); + relationships = Collections.unmodifiableSet(rels); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(QUERY); + descriptors.add(QUERY_ATTRIBUTE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CLIENT_SERVICE); + descriptors.add(SPLIT_UP_HITS); + descriptors.add(SPLIT_UP_AGGREGATIONS); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class)); + + splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue(); + splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue(); + } + + @OnStopped + public void onStopped() { + this.clientService = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile input = null; + if (context.hasIncomingConnection()) { + input = session.get(); + + if (input == null && context.hasNonLoopConnection()) { + return; + } + } + + try { + final Q queryJsonParameters = buildJsonQueryParameters(input, context, session); + + List<FlowFile> hitsFlowFiles = new ArrayList<>(); + final StopWatch stopWatch = new StopWatch(true); + final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch); + + finishQuery(input, queryJsonParameters, session, response); + } catch (Exception ex) { + getLogger().error("Error processing flowfile.", ex); + if (input != null) { + session.transfer(input, REL_FAILURE); + } + context.yield(); + } + } + + abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException; Review comment: (as above) ########## File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor { + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") + .description("All original flowfiles that don't cause an error to occur go to this relationship.").build(); + public static final Relationship REL_HITS = new Relationship.Builder().name("hits") + .description("Search hits are routed to this relationship.") + .build(); + public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") + .description("Aggregations are routed to this relationship.") + .build(); + + public static final AllowableValue SPLIT_UP_YES = new AllowableValue( + "splitUp-yes", + "Yes", + "Split up results." + ); + public static final AllowableValue SPLIT_UP_NO = new AllowableValue( + "splitUp-no", + "No", + "Don't split up results." + ); + + public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder() + .name("el-rest-split-up-hits") + .displayName("Split up search results") + .description("Split up search results into one flowfile per result.") + .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES) + .defaultValue(SPLIT_UP_NO.getValue()) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder() + .name("el-rest-split-up-aggregations") + .displayName("Split up aggregation results") + .description("Split up aggregation results into one flowfile per result.") + .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES) + .defaultValue(SPLIT_UP_NO.getValue()) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + private static final Set<Relationship> relationships; + private static final List<PropertyDescriptor> propertyDescriptors; + + AtomicReference<ElasticSearchClientService> clientService; + String splitUpHits; + private String splitUpAggregations; + + final ObjectMapper mapper = new ObjectMapper(); + + static { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_ORIGINAL); + rels.add(REL_FAILURE); + rels.add(REL_HITS); + rels.add(REL_AGGREGATIONS); + relationships = Collections.unmodifiableSet(rels); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(QUERY); + descriptors.add(QUERY_ATTRIBUTE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CLIENT_SERVICE); + descriptors.add(SPLIT_UP_HITS); + descriptors.add(SPLIT_UP_AGGREGATIONS); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class)); + + splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue(); + splitUpAggregations = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue(); + } + + @OnStopped + public void onStopped() { + this.clientService = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile input = null; + if (context.hasIncomingConnection()) { + input = session.get(); + + if (input == null && context.hasNonLoopConnection()) { + return; + } + } + + try { + final Q queryJsonParameters = buildJsonQueryParameters(input, context, session); + + List<FlowFile> hitsFlowFiles = new ArrayList<>(); + final StopWatch stopWatch = new StopWatch(true); + final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, input, stopWatch); + + finishQuery(input, queryJsonParameters, session, response); + } catch (Exception ex) { + getLogger().error("Error processing flowfile.", ex); + if (input != null) { + session.transfer(input, REL_FAILURE); + } + context.yield(); + } + } + + abstract Q buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException; + + void populateCommonJsonQueryParameters(final Q queryJsonParameters, final FlowFile input, final ProcessContext context, + final ProcessSession session) throws IOException { + final String query = getQuery(input, context, session); + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue(); + final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue(); + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet() + ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue() + : null; + + queryJsonParameters.setQuery(query); + queryJsonParameters.setIndex(index); + queryJsonParameters.setType(type); + queryJsonParameters.setQueryAttr(queryAttr); + } + + abstract SearchResponse doQuery(final Q queryJsonParameters, final List<FlowFile> hitsFlowFiles, final ProcessSession session, Review comment: (as above) ########## File path: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> { + public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue( + "splitUp-combine", + "Combine", + "Combine results from all responses (one flowfile per entire paginated result set of hits). " + + "Note that aggregations cannot be paged, they are generated across the entire result set and " + + "returned as part of the first page. Results are output with one JSON object per line " + + "(allowing hits to be combined from multiple pages without loading all results into memory)." + ); + + public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SPLIT_UP_HITS) + .description("Split up search results into one flowfile per result or combine all hits from all pages into a single flowfile.") + .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES, SPLIT_UP_COMBINE) + .build(); + + public static final AllowableValue PAGINATION_SEARCH_AFTER = new AllowableValue( + "pagination-search_after", + "Search After", + "Use Elasticsearch \"search_after\" to page sorted results." + ); + public static final AllowableValue PAGINATION_POINT_IN_TIME = new AllowableValue( + "pagination-pit", + "Point in Time", + "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results." + ); + public static final AllowableValue PAGINATION_SCROLL = new AllowableValue( + "pagination-scroll", + "Scroll", + "Use Elasticsearch \"scroll\" to page results." + ); + + public static final PropertyDescriptor PAGINATION_TYPE = new PropertyDescriptor.Builder() + .name("el-rest-pagination-type") + .displayName("Pagination Type") + .description("Pagination method to use. Not all types are available for all Elasticsearch versions, " + + "check the Elasticsearch docs to confirm which are applicable and recommended for your service.") + .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, PAGINATION_POINT_IN_TIME) + .defaultValue(PAGINATION_SCROLL.getValue()) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor PAGINATION_KEEP_ALIVE = new PropertyDescriptor.Builder() + .name("el-rest-pagination-keep-alive") + .displayName("Pagination Keep Alive") + .description("Pagination \"keep_alive\" period. Period Elasticsearch will keep the scroll/pit cursor alive " + + "in between requests (this is not the time expected for all pages to be returned, but the maximum " + + "allowed time between requests for page retrieval).") + .required(true) + .defaultValue("10 mins") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 24, TimeUnit.HOURS)) + .build(); + + static final List<PropertyDescriptor> paginatedPropertyDescriptors; + static { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(QUERY_ATTRIBUTE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CLIENT_SERVICE); + descriptors.add(SPLIT_UP_HITS); + descriptors.add(SPLIT_UP_AGGREGATIONS); + descriptors.add(PAGINATION_TYPE); + descriptors.add(PAGINATION_KEEP_ALIVE); + + paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors); + } + + // output as newline delimited JSON (allows for multiple pages of results to be appended to existing FlowFiles without retaining all hits in memory) + private final ObjectWriter writer = mapper.writer().withRootValueSeparator("\n"); + + String paginationType; Review comment: (as above) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
