ChrisSamo632 commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r676386467



##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new 
AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = 
context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, 
context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, 
hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final 
ProcessContext context, final ProcessSession session) throws IOException;

Review comment:
       (as above)

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new 
AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = 
context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, 
context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, 
hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final 
ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final 
FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) 
throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final 
List<FlowFile> hitsFlowFiles, final ProcessSession session,

Review comment:
       (as above)

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends 
AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+    public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(
+            "splitUp-combine",
+            "Combine",
+            "Combine results from all responses (one flowfile per entire 
paginated result set of hits). " +
+                    "Note that aggregations cannot be paged, they are 
generated across the entire result set and " +
+                    "returned as part of the first page. Results are output 
with one JSON object per line " +
+                    "(allowing hits to be combined from multiple pages without 
loading all results into memory)."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            
.fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SPLIT_UP_HITS)
+            .description("Split up search results into one flowfile per result 
or combine all hits from all pages into a single flowfile.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES, SPLIT_UP_COMBINE)
+            .build();
+
+    public static final AllowableValue PAGINATION_SEARCH_AFTER = new 
AllowableValue(
+            "pagination-search_after",
+            "Search After",
+            "Use Elasticsearch \"search_after\" to page sorted results."
+    );
+    public static final AllowableValue PAGINATION_POINT_IN_TIME = new 
AllowableValue(
+            "pagination-pit",
+            "Point in Time",
+            "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page 
sorted results."
+    );
+    public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
+            "pagination-scroll",
+            "Scroll",
+            "Use Elasticsearch \"scroll\" to page results."
+    );
+
+    public static final PropertyDescriptor PAGINATION_TYPE = new 
PropertyDescriptor.Builder()
+            .name("el-rest-pagination-type")
+            .displayName("Pagination Type")
+            .description("Pagination method to use. Not all types are 
available for all Elasticsearch versions, " +
+                    "check the Elasticsearch docs to confirm which are 
applicable and recommended for your service.")
+            .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, 
PAGINATION_POINT_IN_TIME)
+            .defaultValue(PAGINATION_SCROLL.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PAGINATION_KEEP_ALIVE = new 
PropertyDescriptor.Builder()
+            .name("el-rest-pagination-keep-alive")
+            .displayName("Pagination Keep Alive")
+            .description("Pagination \"keep_alive\" period. Period 
Elasticsearch will keep the scroll/pit cursor alive " +
+                    "in between requests (this is not the time expected for 
all pages to be returned, but the maximum " +
+                    "allowed time between requests for page retrieval).")
+            .required(true)
+            .defaultValue("10 mins")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, 24, TimeUnit.HOURS))
+            .build();
+
+    static final List<PropertyDescriptor> paginatedPropertyDescriptors;
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+        descriptors.add(PAGINATION_TYPE);
+        descriptors.add(PAGINATION_KEEP_ALIVE);
+
+        paginatedPropertyDescriptors = 
Collections.unmodifiableList(descriptors);
+    }
+
+    // output as newline delimited JSON (allows for multiple pages of results 
to be appended to existing FlowFiles without retaining all hits in memory)
+    private final ObjectWriter writer = 
mapper.writer().withRootValueSeparator("\n");
+
+    String paginationType;

Review comment:
       (as above)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to