gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r675468689



##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
##########
@@ -25,11 +25,11 @@
  * covers all CRUD-related operations that can be executed against an 
Elasticsearch index with documents.
  */
 public class IndexOperationRequest {
-    private String index;
-    private String type;
-    private String id;
-    private Map<String, Object> fields;
-    private Operation operation;
+    private final String index;
+    private final String type;
+    private final String id;
+    private final Map<String, Object> fields;
+    private final Operation operation;
 
     public IndexOperationRequest(String index, String type, String id, 
Map<String, Object> fields, Operation operation) {

Review comment:
       Perhaps also a good time to make these parameters final.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -165,10 +165,12 @@ private void setupClient(ConfigurationContext context) 
throws MalformedURLExcept
         this.client = builder.build();
     }
 
-    private Response runQuery(String endpoint, String query, String index, 
String type) {
-        StringBuilder sb = new StringBuilder()
-            .append("/").append(index);
-        if (type != null && !type.equals("")) {
+    private Response runQuery(String endpoint, String query, String index, 
String type, Map<String, String> requestParameters) {

Review comment:
       Consider final parameters and StringBuilder here.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
##########
@@ -159,62 +166,141 @@
             <artifactId>nifi-ssl-context-service</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>3.6.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
             <scope>test</scope>
-       </dependency>
-       <dependency>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>5.6.16</version>
         </dependency>
     </dependencies>
 
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>3.0.0-M3</version>
+                </plugin>
+                <plugin>
+                    <groupId>com.github.alexcojocaru</groupId>
+                    <artifactId>elasticsearch-maven-plugin</artifactId>
+                    <version>6.19</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
     <profiles>
         <profile>
+            <!-- use with integration-tests only (-default can be used if 
x-pack-ml permissions fixed) -->
             <id>integration-6</id>

Review comment:
       Can you provide some examples of correctly running the integration tests?
   
   When running `mvn clean install -Pintegration-tests -Pintegration-6` and 
`mvn clean install -Pintegration-test -Pintegration-7`, I get the following 
error:
   ```
   [ERROR] Failed to execute goal 
com.github.alexcojocaru:elasticsearch-maven-plugin:6.19:runforked 
(start-elasticsearch) on project nifi-elasticsearch-client-service: 
Elasticsearch [0]: Cannot execute command '[bin/elasticsearch-plugin, list]' in 
directory 
'/Users/jgresock/workspace/nifi.PR/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/target/elasticsearch0'
   [ERROR] Output:
   [ERROR]
   [ERROR] Error:
   [ERROR] Exception in thread "main" java.lang.NoSuchMethodError: 
org.elasticsearch.cli.MultiCommand.<init>(Ljava/lang/String;)V
   [ERROR]      at org.elasticsearch.plugins.PluginCli.<init>(PluginCli.java:39)
   [ERROR]      at org.elasticsearch.plugins.PluginCli.main(PluginCli.java:47): 
Process exited with an error: 1 (Exit value: 1)
   ```
   Not sure if this is user error, but I wanted to bring it up.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();

Review comment:
       What do you think about adding a BATCH_SIZE property similar to 
`ListS3`, which would commit the session after the configured batch size?  My 
concern is that on very large result sets, this processor could be running for 
a long time without emitting any flow files.  If BATCH_SIZE is unset, you could 
use the existing behavior.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new 
AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = 
context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, 
context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, 
hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final 
ProcessContext context, final ProcessSession session) throws IOException;

Review comment:
       Why not protected here?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;

Review comment:
       Is there a reason you chose package scope instead of protected for 
clientService and slpitUpHits?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ConsumeElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and 
Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an 
Elasticsearch cluster in a paginated manner.

Review comment:
       Since the query doesn't have to be from Kibana, what about "be able to 
take a JSON query (e.g., from Kibana) and execute it..."?
   

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name 
of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The 
number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of 
the page (request) in which the results were returned that are in the output 
flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "read", 
"json"})
+@CapabilityDescription("A processor that allows the user to run a paginated 
query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "It will use the flowfile's content for the query unless the QUERY 
attribute is populated. " +
+        "Search After/Point in Time queries must include a valid \"sort\" 
field.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
+public class PaginatedJsonQueryElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.addAll(paginatedPropertyDescriptors);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    void finishQuery(final FlowFile input, final PaginatedJsonQueryParameters 
paginatedQueryJsonParameters,

Review comment:
       Consider protected methods here

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();

Review comment:
       Also, I just noticed that you have an ObjectWriter in 
`AbstractPaginatedJsonQueryElasticsearch` -- any way you can combine these?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,67 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PaginatedJsonQueryElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and 
Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an 
Elasticsearch cluster in a paginated manner.
+    Like all processors in the "restapi" bundle, it uses the official Elastic 
client APIs, so it supports leader detection.</p>
+    <p>The query to execute can be provided either in the Query configuration 
property or in an attribute on a flowfile. In
+    the latter case, the name of the attribute (Expression Language is 
supported here) must be provided in the Query
+    property. If no Query property is provided and a flowfile is received, the 
content of the flowfile is used as the Query JSON.</p>

Review comment:
       This seems unclear to me based on the actual functionality.  How about:
   ```suggestion
       <p>The query JSON to execute can be provided either in the Query 
configuration property or in the content of the 
       flowfile.  If the Query Attribute property is configured, the executed 
query JSON will be placed in the attribute provided by this property.</p>
   ```

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, 
Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, 
requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {
+        try {
+            final HttpEntity scrollEntity = new NStringEntity(scroll, 
ContentType.APPLICATION_JSON);
+            final Response response = client.performRequest("POST", 
"/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public String initialisePointInTime(String index, String keepAlive) {

Review comment:
       Consider final for parameters

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, 
Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, 
requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {
+        try {
+            final HttpEntity scrollEntity = new NStringEntity(scroll, 
ContentType.APPLICATION_JSON);
+            final Response response = client.performRequest("POST", 
"/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public String initialisePointInTime(String index, String keepAlive) {
+        try {
+            final Map<String, String> params = new HashMap<String, String>() {{
+                if (StringUtils.isNotBlank(keepAlive)) {
+                    put("keep_alive", keepAlive);
+                }
+            }};
+            final Response response = client.performRequest("POST", index + 
"/_pit", params);
+            final String body = 
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);

Review comment:
       Consider declaring a Charset constant for UTF-8, since it's used in 
multiple places here.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, 
Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, 
requestParameters);
+            return buildSearchResponse(response);

Review comment:
       One thing I noticed during testing was that if I provided a 'type' value 
in Elasticsearch 7, the query completes with 0 results.  I understand that this 
would be a user error, but I also noticed the following in the logs:
   ```
   2021-07-23 06:22:59,910 WARN [I/O dispatcher 1] 
org.elasticsearch.client.RestClient request [POST 
http://localhost:9200/my_index/test/_search?scroll=600s] returned 2 warnings: 
[299 Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942 
"Elasticsearch built-in security features are not enabled. Without 
authentication, your cluster could be accessible to anyone. See 
https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html
 to enable security."],[299 
Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942 "[types removal] 
Specifying types in search requests is deprecated."]
   2021-07-23 06:22:59,919 WARN [I/O dispatcher 1] 
org.elasticsearch.client.RestClient request [DELETE 
http://localhost:9200/_search/scroll] returned 1 warnings: [299 
Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942 "Elasticsearch 
built-in security features are not enabled. Without authentication, your 
cluster could be accessible to anyone. See 
https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html
 to enable security."]
   ```
   It appears these warnings are pulled from the HTTP Response Headers, which 
are available in the Response object, so if you wanted, you could add these 
warnings to the SearchResponse in order to propagate the warnings to the 
processor's logger.  On the other hand, the user could view the nifi-app.log to 
find this warning.
   
   This PR is already quite substantial, so I'll leave it up to you as to 
whether to include this here (or not), or whether it could be covered in a 
future JIRA issue.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
##########
@@ -111,11 +112,11 @@ default String getQuery(FlowFile input, ProcessContext 
context, ProcessSession s
         if (context.getProperty(QUERY).isSet()) {
             retVal = 
context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
         } else if (input != null) {
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
             session.exportTo(input, out);
             out.close();
 
-            retVal = new String(out.toByteArray());
+            retVal = out.toString();

Review comment:
       What do you think about passing the UTF-8 charset into toString()?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
##########
@@ -40,7 +40,8 @@ public boolean hasErrors() {
         return hasErrors;
     }
 
-    public static IndexOperationResponse fromJsonResponse(String response) 
throws IOException {
+    @SuppressWarnings("unchecked")
+    public static IndexOperationResponse fromJsonResponse(final String 
response) throws IOException {
         ObjectMapper mapper = new ObjectMapper();

Review comment:
       I realize this wasn't part of your PR, but it could be a good time to 
improve the usage of ObjectMapper here.  I feel like this class could store a 
static final ObjectReader, since creating an ObjectMapper is a (relatively) 
expensive operation.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, 
Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, 
requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SearchResponse scroll(String scroll) {

Review comment:
       Consider final for parameters

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();

Review comment:
       For clarity, since you only use write methods, what about making this 
the ObjectWriter?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
     }
 
     @Override
-    public SearchResponse search(String query, String index, String type) {
-        Response response = runQuery("_search", query, index, type);
-        Map<String, Object> parsed = parseResponse(response);
+    public SearchResponse search(String query, String index, String type, 
Map<String, String> requestParameters) {
+        try {
+            final Response response = runQuery("_search", query, index, type, 
requestParameters);
+            return buildSearchResponse(response);
+        } catch (Exception ex) {

Review comment:
       Not that it adds a huge amount of value here, but consider declaring the 
exception as final.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends 
AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+    public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(

Review comment:
       The "Split up..." properties are the only part that doesn't feel natural 
to me.  What about the following scheme:
   **Search Results Split** property, allowable values:
   1. Flow file per result
   2. Flow file per page
   3. Flow file per query
   
   **Aggregation Results Split** property, allowable values:
   1. Flow file per result
   2. Flow file per query
   
   ... or some variant thereof.  I had to run it through the processor with 
each type to figure out the difference between Split = No vs. Split = Combine.
   
   Edit: I understand now that these properties already existed in 
JsonQueryElasticsearch and you're just trying to reuse them.  I wonder if it 
would be better to make the properties in your new processors more intuitive 
rather than tying them to JsonQueryElasticsearch.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new 
AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = 
context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, 
context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, 
hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final 
ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final 
FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) 
throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final 
List<FlowFile> hitsFlowFiles, final ProcessSession session,

Review comment:
       Same question for all these methods

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name 
of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The 
number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of 
the page (request) in which the results were returned that are in the output 
flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", 
"consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a 
paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" 
field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, 
searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the 
Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus 
the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       I found what looks like a bug where the state is not actually cleared 
when the expirationTimestamp indicates.  I configured a ConsumeElasticsearch 
processor with a Keep Alive time of 30 seconds and Run Schedule of 1 second, 
and used a query that returned 15 pages of results (Split search/aggregation 
set to No).  Using the Scroll setting, this ran 15 times, and on the 16th time, 
it started indicating 404 Errors.  Then the 30 seconds expired, and the 
processor started producing results again, by emitting Page 1 one time for each 
run, over and over. I noted that the state didn't appear to have been cleared, 
so pageCount was stuck at 15, expirationTimestamp remained static, etc.  Same 
thing happened for PIT and SearchAfter, though SearchAfter didn't output any 
errors once the result set was complete.
   
   Beyond the state clearing, we should consider how to handle the case where 
the result set completes, since we probably don't want to log an ERROR in what 
should be considered a successful case.
   
   Also, what is the intended behavior if the query runs longer than the Keep 
Alive time?  Should the processor start over on the query, or continue with the 
result set until the last page is reached, and then start over?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name 
of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The 
number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of 
the page (request) in which the results were returned that are in the output 
flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", 
"consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a 
paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" 
field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, 
searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the 
Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus 
the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       What would you consider the compelling reason to have this processor in 
addition to PaginatedJsonQueryElasticsearch?  The differences I see are the 
convenience of having it be a source processor (whereas you'd have to put a 
GenerateFlowFile processor in front of PaginatedJsonQueryElasticsearch to 
achieve the same thing), and there is a query expiration for 
ConsumeElasticsearch.  I'm just wondering if the two processors could be 
combined to serve the same purpose.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends 
JsonQueryParameters> extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("All original flowfiles that don't cause an error to 
occur go to this relationship.").build();
+    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+            .description("Search hits are routed to this relationship.")
+            .build();
+    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+            .description("Aggregations are routed to this relationship.")
+            .build();
+
+    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+            "splitUp-yes",
+            "Yes",
+            "Split up results."
+    );
+    public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+            "splitUp-no",
+            "No",
+            "Don't split up results."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-hits")
+            .displayName("Split up search results")
+            .description("Split up search results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
+            .name("el-rest-split-up-aggregations")
+            .displayName("Split up aggregation results")
+            .description("Split up aggregation results into one flowfile per 
result.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+            .defaultValue(SPLIT_UP_NO.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    AtomicReference<ElasticSearchClientService> clientService;
+    String splitUpHits;
+    private String splitUpAggregations;
+
+    final ObjectMapper mapper = new ObjectMapper();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        rels.add(REL_HITS);
+        rels.add(REL_AGGREGATIONS);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = new 
AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+        splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+        splitUpAggregations = 
context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.clientService = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final Q queryJsonParameters = buildJsonQueryParameters(input, 
context, session);
+
+            List<FlowFile> hitsFlowFiles = new ArrayList<>();
+            final StopWatch stopWatch = new StopWatch(true);
+            final SearchResponse response = doQuery(queryJsonParameters, 
hitsFlowFiles, session, input, stopWatch);
+
+            finishQuery(input, queryJsonParameters, session, response);
+        } catch (Exception ex) {
+            getLogger().error("Error processing flowfile.", ex);
+            if (input != null) {
+                session.transfer(input, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    abstract Q buildJsonQueryParameters(final FlowFile input, final 
ProcessContext context, final ProcessSession session) throws IOException;
+
+    void populateCommonJsonQueryParameters(final Q queryJsonParameters, final 
FlowFile input, final ProcessContext context,
+                                           final ProcessSession session) 
throws IOException {
+        final String query = getQuery(input, context, session);
+        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+        final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+
+        queryJsonParameters.setQuery(query);
+        queryJsonParameters.setIndex(index);
+        queryJsonParameters.setType(type);
+        queryJsonParameters.setQueryAttr(queryAttr);
+    }
+
+    abstract SearchResponse doQuery(final Q queryJsonParameters, final 
List<FlowFile> hitsFlowFiles, final ProcessSession session,
+                                    final FlowFile input, final StopWatch 
stopWatch) throws IOException;
+
+    abstract void finishQuery(final FlowFile input, final Q queryParameters, 
final ProcessSession session,
+                              final SearchResponse response) throws 
IOException;
+
+    FlowFile createChildFlowFile(final ProcessSession session, final FlowFile 
parent) {
+        return parent != null ? session.create(parent) : session.create();
+    }
+
+    private FlowFile writeAggregationFlowFileContents(final String name, final 
Integer number, final String json,
+                                                      final ProcessSession 
session, final FlowFile aggFlowFile,
+                                                      final Map<String, 
String> attributes) {
+        FlowFile ff = session.write(aggFlowFile, out -> 
out.write(json.getBytes()));
+        ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
+            if (name != null) {
+                put("aggregation.name", name);
+            }
+            if (number != null) {
+                put("aggregation.number", number.toString());
+            }
+        }});
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    private void handleAggregations(final Map<String, Object> aggregations, 
final ProcessSession session,
+                                    final FlowFile parent, final Map<String, 
String> attributes,
+                                    final String transitUri, final StopWatch 
stopWatch) throws IOException {
+        if (aggregations != null && !aggregations.isEmpty()) {
+            final List<FlowFile> aggsFlowFiles = new ArrayList<>();
+            if (splitUpAggregations.equals(SPLIT_UP_YES.getValue())) {
+                int aggCount = 0;
+                for (final Map.Entry<String, Object> agg : 
aggregations.entrySet()) {
+                    final FlowFile aggFlowFile = createChildFlowFile(session, 
parent);
+                    final String aggJson = 
mapper.writeValueAsString(agg.getValue());
+                    
aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, 
aggJson, session, aggFlowFile, attributes));
+                }
+            } else {
+                final FlowFile aggFlowFile = createChildFlowFile(session, 
parent);
+                final String json = mapper.writeValueAsString(aggregations);
+                aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, 
json, session, aggFlowFile, attributes));
+            }
+
+            if (!aggsFlowFiles.isEmpty()) {
+                session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
+                aggsFlowFiles.forEach(ff -> 
session.getProvenanceReporter().receive(ff, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
+            }
+        }
+    }
+
+    private FlowFile writeHitFlowFile(final int count, final String json, 
final ProcessSession session,
+                                      final FlowFile hitFlowFile, final 
Map<String, String> attributes) {
+        final FlowFile ff = session.write(hitFlowFile, out -> 
out.write(json.getBytes()));
+        attributes.put("hit.count", Integer.toString(count));
+
+        return session.putAllAttributes(ff, attributes);
+    }
+
+    List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q 
queryJsonParameters, final ProcessSession session,
+                              final FlowFile parent, final Map<String, String> 
attributes, final List<FlowFile> hitsFlowFiles,
+                              final String transitUri, final StopWatch 
stopWatch) throws IOException {
+        if (hits != null && !hits.isEmpty()) {
+            if (SPLIT_UP_YES.getValue().equals(splitUpHits)) {
+                for (final Map<String, Object> hit : hits) {
+                    final FlowFile hitFlowFile = createChildFlowFile(session, 
parent);
+                    final String json = mapper.writeValueAsString(hit);
+                    hitsFlowFiles.add(writeHitFlowFile(1, json, session, 
hitFlowFile, attributes));
+                }
+            } else {
+                final FlowFile hitFlowFile = createChildFlowFile(session, 
parent);
+                final String json = mapper.writeValueAsString(hits);
+                hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, 
hitFlowFile, attributes));
+            }
+        }
+
+        // output any results
+        if (!hitsFlowFiles.isEmpty()) {

Review comment:
       It looks like this method will always return an empty list, then?  
Perhaps some method comments to indicate the purpose of the returned 
`List<FlowFile>`.  Can you walk me through the logic of returning anything if 
it will always be empty?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends 
AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+    public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(
+            "splitUp-combine",
+            "Combine",
+            "Combine results from all responses (one flowfile per entire 
paginated result set of hits). " +
+                    "Note that aggregations cannot be paged, they are 
generated across the entire result set and " +
+                    "returned as part of the first page. Results are output 
with one JSON object per line " +
+                    "(allowing hits to be combined from multiple pages without 
loading all results into memory)."
+    );
+
+    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
+            
.fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SPLIT_UP_HITS)
+            .description("Split up search results into one flowfile per result 
or combine all hits from all pages into a single flowfile.")
+            .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES, SPLIT_UP_COMBINE)
+            .build();
+
+    public static final AllowableValue PAGINATION_SEARCH_AFTER = new 
AllowableValue(
+            "pagination-search_after",
+            "Search After",
+            "Use Elasticsearch \"search_after\" to page sorted results."
+    );
+    public static final AllowableValue PAGINATION_POINT_IN_TIME = new 
AllowableValue(
+            "pagination-pit",
+            "Point in Time",
+            "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page 
sorted results."
+    );
+    public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
+            "pagination-scroll",
+            "Scroll",
+            "Use Elasticsearch \"scroll\" to page results."
+    );
+
+    public static final PropertyDescriptor PAGINATION_TYPE = new 
PropertyDescriptor.Builder()
+            .name("el-rest-pagination-type")
+            .displayName("Pagination Type")
+            .description("Pagination method to use. Not all types are 
available for all Elasticsearch versions, " +
+                    "check the Elasticsearch docs to confirm which are 
applicable and recommended for your service.")
+            .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, 
PAGINATION_POINT_IN_TIME)
+            .defaultValue(PAGINATION_SCROLL.getValue())
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PAGINATION_KEEP_ALIVE = new 
PropertyDescriptor.Builder()
+            .name("el-rest-pagination-keep-alive")
+            .displayName("Pagination Keep Alive")
+            .description("Pagination \"keep_alive\" period. Period 
Elasticsearch will keep the scroll/pit cursor alive " +
+                    "in between requests (this is not the time expected for 
all pages to be returned, but the maximum " +
+                    "allowed time between requests for page retrieval).")
+            .required(true)
+            .defaultValue("10 mins")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, 24, TimeUnit.HOURS))
+            .build();
+
+    static final List<PropertyDescriptor> paginatedPropertyDescriptors;
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+        descriptors.add(SPLIT_UP_HITS);
+        descriptors.add(SPLIT_UP_AGGREGATIONS);
+        descriptors.add(PAGINATION_TYPE);
+        descriptors.add(PAGINATION_KEEP_ALIVE);
+
+        paginatedPropertyDescriptors = 
Collections.unmodifiableList(descriptors);
+    }
+
+    // output as newline delimited JSON (allows for multiple pages of results 
to be appended to existing FlowFiles without retaining all hits in memory)
+    private final ObjectWriter writer = 
mapper.writer().withRootValueSeparator("\n");
+
+    String paginationType;

Review comment:
       Could this be protected?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
##########
@@ -1,64 +0,0 @@
-<!DOCTYPE html>

Review comment:
       Is there a reason this was removed?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+    @WritesAttribute(attribute = "aggregation.name", description = "The name 
of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "aggregation.number", description = "The 
number of the aggregation whose results are in the output flowfile"),
+    @WritesAttribute(attribute = "page.number", description = "The number of 
the page (request) in which the results were returned that are in the output 
flowfile"),
+    @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", 
"consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a 
paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+        "Search After/Point in Time queries must include a valid \"sort\" 
field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, 
searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+        "is retained in between invocations of this processor until the 
Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus 
the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

Review comment:
       A comment about naming: the name `ConsumeElasticsearch` invokes the 
image of consuming a queue or topic, but this may be misleading since that is 
not what would actually happen with Elasticsearch, as nothing actually 
disappears (is consumed) when the results are processed.  Instead, this is more 
of a standing query, right?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
##########
@@ -1,48 +0,0 @@
-<!DOCTYPE html>

Review comment:
       Is there a reason this was removed?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,67 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PaginatedJsonQueryElasticsearch</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+<body>
+    <p>This processor is intended for use with the Elasticsearch JSON DSL and 
Elasticsearch 5.X and newer. It is designed
+    to be able to take a query from Kibana and execute it as-is against an 
Elasticsearch cluster in a paginated manner.

Review comment:
       Same comment from above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to