gresockj commented on a change in pull request #5193:
URL: https://github.com/apache/nifi/pull/5193#discussion_r675468689
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
##########
@@ -25,11 +25,11 @@
* covers all CRUD-related operations that can be executed against an
Elasticsearch index with documents.
*/
public class IndexOperationRequest {
- private String index;
- private String type;
- private String id;
- private Map<String, Object> fields;
- private Operation operation;
+ private final String index;
+ private final String type;
+ private final String id;
+ private final Map<String, Object> fields;
+ private final Operation operation;
public IndexOperationRequest(String index, String type, String id,
Map<String, Object> fields, Operation operation) {
Review comment:
Perhaps also a good time to make these parameters final.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -165,10 +165,12 @@ private void setupClient(ConfigurationContext context)
throws MalformedURLExcept
this.client = builder.build();
}
- private Response runQuery(String endpoint, String query, String index,
String type) {
- StringBuilder sb = new StringBuilder()
- .append("/").append(index);
- if (type != null && !type.equals("")) {
+ private Response runQuery(String endpoint, String query, String index,
String type, Map<String, String> requestParameters) {
Review comment:
Consider final parameters and StringBuilder here.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
##########
@@ -159,62 +166,141 @@
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-artifact</artifactId>
+ <version>3.6.3</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>${nifi.groovy.version}</version>
<scope>test</scope>
- </dependency>
- <dependency>
+ </dependency>
+ <dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>5.6.16</version>
</dependency>
</dependencies>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>3.0.0-M3</version>
+ </plugin>
+ <plugin>
+ <groupId>com.github.alexcojocaru</groupId>
+ <artifactId>elasticsearch-maven-plugin</artifactId>
+ <version>6.19</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
<profiles>
<profile>
+ <!-- use with integration-tests only (-default can be used if
x-pack-ml permissions fixed) -->
<id>integration-6</id>
Review comment:
Can you provide some examples of correctly running the integration tests?
When running `mvn clean install -Pintegration-tests -Pintegration-6` and
`mvn clean install -Pintegration-test -Pintegration-7`, I get the following
error:
```
[ERROR] Failed to execute goal
com.github.alexcojocaru:elasticsearch-maven-plugin:6.19:runforked
(start-elasticsearch) on project nifi-elasticsearch-client-service:
Elasticsearch [0]: Cannot execute command '[bin/elasticsearch-plugin, list]' in
directory
'/Users/jgresock/workspace/nifi.PR/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/target/elasticsearch0'
[ERROR] Output:
[ERROR]
[ERROR] Error:
[ERROR] Exception in thread "main" java.lang.NoSuchMethodError:
org.elasticsearch.cli.MultiCommand.<init>(Ljava/lang/String;)V
[ERROR] at org.elasticsearch.plugins.PluginCli.<init>(PluginCli.java:39)
[ERROR] at org.elasticsearch.plugins.PluginCli.main(PluginCli.java:47):
Process exited with an error: 1 (Exit value: 1)
```
Not sure if this is user error, but I wanted to bring it up.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends
JsonQueryParameters> extends AbstractProcessor implements
ElasticsearchRestProcessor {
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original")
+ .description("All original flowfiles that don't cause an error to
occur go to this relationship.").build();
+ public static final Relationship REL_HITS = new
Relationship.Builder().name("hits")
+ .description("Search hits are routed to this relationship.")
+ .build();
+ public static final Relationship REL_AGGREGATIONS = new
Relationship.Builder().name("aggregations")
+ .description("Aggregations are routed to this relationship.")
+ .build();
+
+ public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+ "splitUp-yes",
+ "Yes",
+ "Split up results."
+ );
+ public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+ "splitUp-no",
+ "No",
+ "Don't split up results."
+ );
+
+ public static final PropertyDescriptor SPLIT_UP_HITS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-hits")
+ .displayName("Split up search results")
+ .description("Split up search results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+ public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-aggregations")
+ .displayName("Split up aggregation results")
+ .description("Split up aggregation results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ AtomicReference<ElasticSearchClientService> clientService;
+ String splitUpHits;
+ private String splitUpAggregations;
+
+ final ObjectMapper mapper = new ObjectMapper();
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_ORIGINAL);
+ rels.add(REL_FAILURE);
+ rels.add(REL_HITS);
+ rels.add(REL_AGGREGATIONS);
+ relationships = Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
Review comment:
What do you think about adding a BATCH_SIZE property similar to
`ListS3`, which would commit the session after the configured batch size? My
concern is that on very large result sets, this processor could be running for
a long time without emitting any flow files. If BATCH_SIZE is unset, you could
use the existing behavior.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends
JsonQueryParameters> extends AbstractProcessor implements
ElasticsearchRestProcessor {
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original")
+ .description("All original flowfiles that don't cause an error to
occur go to this relationship.").build();
+ public static final Relationship REL_HITS = new
Relationship.Builder().name("hits")
+ .description("Search hits are routed to this relationship.")
+ .build();
+ public static final Relationship REL_AGGREGATIONS = new
Relationship.Builder().name("aggregations")
+ .description("Aggregations are routed to this relationship.")
+ .build();
+
+ public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+ "splitUp-yes",
+ "Yes",
+ "Split up results."
+ );
+ public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+ "splitUp-no",
+ "No",
+ "Don't split up results."
+ );
+
+ public static final PropertyDescriptor SPLIT_UP_HITS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-hits")
+ .displayName("Split up search results")
+ .description("Split up search results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+ public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-aggregations")
+ .displayName("Split up aggregation results")
+ .description("Split up aggregation results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ AtomicReference<ElasticSearchClientService> clientService;
+ String splitUpHits;
+ private String splitUpAggregations;
+
+ final ObjectMapper mapper = new ObjectMapper();
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_ORIGINAL);
+ rels.add(REL_FAILURE);
+ rels.add(REL_HITS);
+ rels.add(REL_AGGREGATIONS);
+ relationships = Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(QUERY);
+ descriptors.add(QUERY_ATTRIBUTE);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(CLIENT_SERVICE);
+ descriptors.add(SPLIT_UP_HITS);
+ descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ clientService = new
AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+ splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+ splitUpAggregations =
context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+ }
+
+ @OnStopped
+ public void onStopped() {
+ this.clientService = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+ FlowFile input = null;
+ if (context.hasIncomingConnection()) {
+ input = session.get();
+
+ if (input == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ try {
+ final Q queryJsonParameters = buildJsonQueryParameters(input,
context, session);
+
+ List<FlowFile> hitsFlowFiles = new ArrayList<>();
+ final StopWatch stopWatch = new StopWatch(true);
+ final SearchResponse response = doQuery(queryJsonParameters,
hitsFlowFiles, session, input, stopWatch);
+
+ finishQuery(input, queryJsonParameters, session, response);
+ } catch (Exception ex) {
+ getLogger().error("Error processing flowfile.", ex);
+ if (input != null) {
+ session.transfer(input, REL_FAILURE);
+ }
+ context.yield();
+ }
+ }
+
+ abstract Q buildJsonQueryParameters(final FlowFile input, final
ProcessContext context, final ProcessSession session) throws IOException;
Review comment:
Why not protected here?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends
JsonQueryParameters> extends AbstractProcessor implements
ElasticsearchRestProcessor {
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original")
+ .description("All original flowfiles that don't cause an error to
occur go to this relationship.").build();
+ public static final Relationship REL_HITS = new
Relationship.Builder().name("hits")
+ .description("Search hits are routed to this relationship.")
+ .build();
+ public static final Relationship REL_AGGREGATIONS = new
Relationship.Builder().name("aggregations")
+ .description("Aggregations are routed to this relationship.")
+ .build();
+
+ public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+ "splitUp-yes",
+ "Yes",
+ "Split up results."
+ );
+ public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+ "splitUp-no",
+ "No",
+ "Don't split up results."
+ );
+
+ public static final PropertyDescriptor SPLIT_UP_HITS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-hits")
+ .displayName("Split up search results")
+ .description("Split up search results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+ public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-aggregations")
+ .displayName("Split up aggregation results")
+ .description("Split up aggregation results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ AtomicReference<ElasticSearchClientService> clientService;
Review comment:
Is there a reason you chose package scope instead of protected for
clientService and slpitUpHits?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>ConsumeElasticsearch</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css" />
+</head>
+<body>
+ <p>This processor is intended for use with the Elasticsearch JSON DSL and
Elasticsearch 5.X and newer. It is designed
+ to be able to take a query from Kibana and execute it as-is against an
Elasticsearch cluster in a paginated manner.
Review comment:
Since the query doesn't have to be from Kibana, what about "be able to
take a JSON query (e.g., from Kibana) and execute it..."?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description =
"application/json"),
+ @WritesAttribute(attribute = "aggregation.name", description = "The name
of the aggregation whose results are in the output flowfile"),
+ @WritesAttribute(attribute = "aggregation.number", description = "The
number of the aggregation whose results are in the output flowfile"),
+ @WritesAttribute(attribute = "page.number", description = "The number of
the page (request) in which the results were returned that are in the output
flowfile"),
+ @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "read",
"json"})
+@CapabilityDescription("A processor that allows the user to run a paginated
query (with aggregations) written with the Elasticsearch JSON DSL. " +
+ "It will use the flowfile's content for the query unless the QUERY
attribute is populated. " +
+ "Search After/Point in Time queries must include a valid \"sort\"
field.")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"Care should be taken on the size of each page because each response " +
+ "from Elasticsearch will be loaded into memory all at once and
converted into the resulting flowfiles.")
+public class PaginatedJsonQueryElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ static {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(QUERY);
+ descriptors.addAll(paginatedPropertyDescriptors);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ void finishQuery(final FlowFile input, final PaginatedJsonQueryParameters
paginatedQueryJsonParameters,
Review comment:
Consider protected methods here
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends
JsonQueryParameters> extends AbstractProcessor implements
ElasticsearchRestProcessor {
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original")
+ .description("All original flowfiles that don't cause an error to
occur go to this relationship.").build();
+ public static final Relationship REL_HITS = new
Relationship.Builder().name("hits")
+ .description("Search hits are routed to this relationship.")
+ .build();
+ public static final Relationship REL_AGGREGATIONS = new
Relationship.Builder().name("aggregations")
+ .description("Aggregations are routed to this relationship.")
+ .build();
+
+ public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+ "splitUp-yes",
+ "Yes",
+ "Split up results."
+ );
+ public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+ "splitUp-no",
+ "No",
+ "Don't split up results."
+ );
+
+ public static final PropertyDescriptor SPLIT_UP_HITS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-hits")
+ .displayName("Split up search results")
+ .description("Split up search results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+ public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-aggregations")
+ .displayName("Split up aggregation results")
+ .description("Split up aggregation results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ AtomicReference<ElasticSearchClientService> clientService;
+ String splitUpHits;
+ private String splitUpAggregations;
+
+ final ObjectMapper mapper = new ObjectMapper();
Review comment:
Also, I just noticed that you have an ObjectWriter in
`AbstractPaginatedJsonQueryElasticsearch` -- any way you can combine these?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,67 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>PaginatedJsonQueryElasticsearch</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css" />
+</head>
+<body>
+ <p>This processor is intended for use with the Elasticsearch JSON DSL and
Elasticsearch 5.X and newer. It is designed
+ to be able to take a query from Kibana and execute it as-is against an
Elasticsearch cluster in a paginated manner.
+ Like all processors in the "restapi" bundle, it uses the official Elastic
client APIs, so it supports leader detection.</p>
+ <p>The query to execute can be provided either in the Query configuration
property or in an attribute on a flowfile. In
+ the latter case, the name of the attribute (Expression Language is
supported here) must be provided in the Query
+ property. If no Query property is provided and a flowfile is received, the
content of the flowfile is used as the Query JSON.</p>
Review comment:
This seems unclear to me based on the actual functionality. How about:
```suggestion
<p>The query JSON to execute can be provided either in the Query
configuration property or in the content of the
flowfile. If the Query Attribute property is configured, the executed
query JSON will be placed in the attribute provided by this property.</p>
```
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
}
@Override
- public SearchResponse search(String query, String index, String type) {
- Response response = runQuery("_search", query, index, type);
- Map<String, Object> parsed = parseResponse(response);
+ public SearchResponse search(String query, String index, String type,
Map<String, String> requestParameters) {
+ try {
+ final Response response = runQuery("_search", query, index, type,
requestParameters);
+ return buildSearchResponse(response);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public SearchResponse scroll(String scroll) {
+ try {
+ final HttpEntity scrollEntity = new NStringEntity(scroll,
ContentType.APPLICATION_JSON);
+ final Response response = client.performRequest("POST",
"/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+ return buildSearchResponse(response);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public String initialisePointInTime(String index, String keepAlive) {
Review comment:
Consider final for parameters
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
}
@Override
- public SearchResponse search(String query, String index, String type) {
- Response response = runQuery("_search", query, index, type);
- Map<String, Object> parsed = parseResponse(response);
+ public SearchResponse search(String query, String index, String type,
Map<String, String> requestParameters) {
+ try {
+ final Response response = runQuery("_search", query, index, type,
requestParameters);
+ return buildSearchResponse(response);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public SearchResponse scroll(String scroll) {
+ try {
+ final HttpEntity scrollEntity = new NStringEntity(scroll,
ContentType.APPLICATION_JSON);
+ final Response response = client.performRequest("POST",
"/_search/scroll", Collections.emptyMap(), scrollEntity);
+
+ return buildSearchResponse(response);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public String initialisePointInTime(String index, String keepAlive) {
+ try {
+ final Map<String, String> params = new HashMap<String, String>() {{
+ if (StringUtils.isNotBlank(keepAlive)) {
+ put("keep_alive", keepAlive);
+ }
+ }};
+ final Response response = client.performRequest("POST", index +
"/_pit", params);
+ final String body =
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
Review comment:
Consider declaring a Charset constant for UTF-8, since it's used in
multiple places here.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
}
@Override
- public SearchResponse search(String query, String index, String type) {
- Response response = runQuery("_search", query, index, type);
- Map<String, Object> parsed = parseResponse(response);
+ public SearchResponse search(String query, String index, String type,
Map<String, String> requestParameters) {
+ try {
+ final Response response = runQuery("_search", query, index, type,
requestParameters);
+ return buildSearchResponse(response);
Review comment:
One thing I noticed during testing was that if I provided a 'type' value
in Elasticsearch 7, the query completes with 0 results. I understand that this
would be a user error, but I also noticed the following in the logs:
```
2021-07-23 06:22:59,910 WARN [I/O dispatcher 1]
org.elasticsearch.client.RestClient request [POST
http://localhost:9200/my_index/test/_search?scroll=600s] returned 2 warnings:
[299 Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942
"Elasticsearch built-in security features are not enabled. Without
authentication, your cluster could be accessible to anyone. See
https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html
to enable security."],[299
Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942 "[types removal]
Specifying types in search requests is deprecated."]
2021-07-23 06:22:59,919 WARN [I/O dispatcher 1]
org.elasticsearch.client.RestClient request [DELETE
http://localhost:9200/_search/scroll] returned 1 warnings: [299
Elasticsearch-7.13.4-c5f60e894ca0c61cdbae4f5a686d9f08bcefc942 "Elasticsearch
built-in security features are not enabled. Without authentication, your
cluster could be accessible to anyone. See
https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html
to enable security."]
```
It appears these warnings are pulled from the HTTP Response Headers, which
are available in the Response object, so if you wanted, you could add these
warnings to the SearchResponse in order to propagate the warnings to the
processor's logger. On the other hand, the user could view the nifi-app.log to
find this warning.
This PR is already quite substantial, so I'll leave it up to you as to
whether to include this here (or not), or whether it could be covered in a
future JIRA issue.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
##########
@@ -111,11 +112,11 @@ default String getQuery(FlowFile input, ProcessContext
context, ProcessSession s
if (context.getProperty(QUERY).isSet()) {
retVal =
context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
} else if (input != null) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(input, out);
out.close();
- retVal = new String(out.toByteArray());
+ retVal = out.toString();
Review comment:
What do you think about passing the UTF-8 charset into toString()?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java
##########
@@ -40,7 +40,8 @@ public boolean hasErrors() {
return hasErrors;
}
- public static IndexOperationResponse fromJsonResponse(String response)
throws IOException {
+ @SuppressWarnings("unchecked")
+ public static IndexOperationResponse fromJsonResponse(final String
response) throws IOException {
ObjectMapper mapper = new ObjectMapper();
Review comment:
I realize this wasn't part of your PR, but it could be a good time to
improve the usage of ObjectMapper here. I feel like this class could store a
static final ObjectReader, since creating an ObjectMapper is a (relatively)
expensive operation.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
}
@Override
- public SearchResponse search(String query, String index, String type) {
- Response response = runQuery("_search", query, index, type);
- Map<String, Object> parsed = parseResponse(response);
+ public SearchResponse search(String query, String index, String type,
Map<String, String> requestParameters) {
+ try {
+ final Response response = runQuery("_search", query, index, type,
requestParameters);
+ return buildSearchResponse(response);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public SearchResponse scroll(String scroll) {
Review comment:
Consider final for parameters
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends
JsonQueryParameters> extends AbstractProcessor implements
ElasticsearchRestProcessor {
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original")
+ .description("All original flowfiles that don't cause an error to
occur go to this relationship.").build();
+ public static final Relationship REL_HITS = new
Relationship.Builder().name("hits")
+ .description("Search hits are routed to this relationship.")
+ .build();
+ public static final Relationship REL_AGGREGATIONS = new
Relationship.Builder().name("aggregations")
+ .description("Aggregations are routed to this relationship.")
+ .build();
+
+ public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+ "splitUp-yes",
+ "Yes",
+ "Split up results."
+ );
+ public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+ "splitUp-no",
+ "No",
+ "Don't split up results."
+ );
+
+ public static final PropertyDescriptor SPLIT_UP_HITS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-hits")
+ .displayName("Split up search results")
+ .description("Split up search results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+ public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-aggregations")
+ .displayName("Split up aggregation results")
+ .description("Split up aggregation results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ AtomicReference<ElasticSearchClientService> clientService;
+ String splitUpHits;
+ private String splitUpAggregations;
+
+ final ObjectMapper mapper = new ObjectMapper();
Review comment:
For clarity, since you only use write methods, what about making this
the ObjectWriter?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -375,44 +383,143 @@ private int handleSearchCount(Object raw) {
}
@Override
- public SearchResponse search(String query, String index, String type) {
- Response response = runQuery("_search", query, index, type);
- Map<String, Object> parsed = parseResponse(response);
+ public SearchResponse search(String query, String index, String type,
Map<String, String> requestParameters) {
+ try {
+ final Response response = runQuery("_search", query, index, type,
requestParameters);
+ return buildSearchResponse(response);
+ } catch (Exception ex) {
Review comment:
Not that it adds a huge amount of value here, but consider declaring the
exception as final.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends
AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+ public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(
Review comment:
The "Split up..." properties are the only part that doesn't feel natural
to me. What about the following scheme:
**Search Results Split** property, allowable values:
1. Flow file per result
2. Flow file per page
3. Flow file per query
**Aggregation Results Split** property, allowable values:
1. Flow file per result
2. Flow file per query
... or some variant thereof. I had to run it through the processor with
each type to figure out the difference between Split = No vs. Split = Combine.
Edit: I understand now that these properties already existed in
JsonQueryElasticsearch and you're just trying to reuse them. I wonder if it
would be better to make the properties in your new processors more intuitive
rather than tying them to JsonQueryElasticsearch.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends
JsonQueryParameters> extends AbstractProcessor implements
ElasticsearchRestProcessor {
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original")
+ .description("All original flowfiles that don't cause an error to
occur go to this relationship.").build();
+ public static final Relationship REL_HITS = new
Relationship.Builder().name("hits")
+ .description("Search hits are routed to this relationship.")
+ .build();
+ public static final Relationship REL_AGGREGATIONS = new
Relationship.Builder().name("aggregations")
+ .description("Aggregations are routed to this relationship.")
+ .build();
+
+ public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+ "splitUp-yes",
+ "Yes",
+ "Split up results."
+ );
+ public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+ "splitUp-no",
+ "No",
+ "Don't split up results."
+ );
+
+ public static final PropertyDescriptor SPLIT_UP_HITS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-hits")
+ .displayName("Split up search results")
+ .description("Split up search results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+ public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-aggregations")
+ .displayName("Split up aggregation results")
+ .description("Split up aggregation results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ AtomicReference<ElasticSearchClientService> clientService;
+ String splitUpHits;
+ private String splitUpAggregations;
+
+ final ObjectMapper mapper = new ObjectMapper();
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_ORIGINAL);
+ rels.add(REL_FAILURE);
+ rels.add(REL_HITS);
+ rels.add(REL_AGGREGATIONS);
+ relationships = Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(QUERY);
+ descriptors.add(QUERY_ATTRIBUTE);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(CLIENT_SERVICE);
+ descriptors.add(SPLIT_UP_HITS);
+ descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ clientService = new
AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+ splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+ splitUpAggregations =
context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+ }
+
+ @OnStopped
+ public void onStopped() {
+ this.clientService = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+ FlowFile input = null;
+ if (context.hasIncomingConnection()) {
+ input = session.get();
+
+ if (input == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ try {
+ final Q queryJsonParameters = buildJsonQueryParameters(input,
context, session);
+
+ List<FlowFile> hitsFlowFiles = new ArrayList<>();
+ final StopWatch stopWatch = new StopWatch(true);
+ final SearchResponse response = doQuery(queryJsonParameters,
hitsFlowFiles, session, input, stopWatch);
+
+ finishQuery(input, queryJsonParameters, session, response);
+ } catch (Exception ex) {
+ getLogger().error("Error processing flowfile.", ex);
+ if (input != null) {
+ session.transfer(input, REL_FAILURE);
+ }
+ context.yield();
+ }
+ }
+
+ abstract Q buildJsonQueryParameters(final FlowFile input, final
ProcessContext context, final ProcessSession session) throws IOException;
+
+ void populateCommonJsonQueryParameters(final Q queryJsonParameters, final
FlowFile input, final ProcessContext context,
+ final ProcessSession session)
throws IOException {
+ final String query = getQuery(input, context, session);
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+ final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+ final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+ ?
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+ : null;
+
+ queryJsonParameters.setQuery(query);
+ queryJsonParameters.setIndex(index);
+ queryJsonParameters.setType(type);
+ queryJsonParameters.setQueryAttr(queryAttr);
+ }
+
+ abstract SearchResponse doQuery(final Q queryJsonParameters, final
List<FlowFile> hitsFlowFiles, final ProcessSession session,
Review comment:
Same question for all these methods
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description =
"application/json"),
+ @WritesAttribute(attribute = "aggregation.name", description = "The name
of the aggregation whose results are in the output flowfile"),
+ @WritesAttribute(attribute = "aggregation.number", description = "The
number of the aggregation whose results are in the output flowfile"),
+ @WritesAttribute(attribute = "page.number", description = "The number of
the page (request) in which the results were returned that are in the output
flowfile"),
+ @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page",
"consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a
paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+ "Search After/Point in Time queries must include a valid \"sort\"
field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId,
searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+ "is retained in between invocations of this processor until the
Scroll/PiT has expired " +
+ "(when the current time is later than the last query execution plus
the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"Care should be taken on the size of each page because each response " +
+ "from Elasticsearch will be loaded into memory all at once and
converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
Review comment:
I found what looks like a bug where the state is not actually cleared
when the expirationTimestamp indicates. I configured a ConsumeElasticsearch
processor with a Keep Alive time of 30 seconds and Run Schedule of 1 second,
and used a query that returned 15 pages of results (Split search/aggregation
set to No). Using the Scroll setting, this ran 15 times, and on the 16th time,
it started indicating 404 Errors. Then the 30 seconds expired, and the
processor started producing results again, by emitting Page 1 one time for each
run, over and over. I noted that the state didn't appear to have been cleared,
so pageCount was stuck at 15, expirationTimestamp remained static, etc. Same
thing happened for PIT and SearchAfter, though SearchAfter didn't output any
errors once the result set was complete.
Beyond the state clearing, we should consider how to handle the case where
the result set completes, since we probably don't want to log an ERROR in what
should be considered a successful case.
Also, what is the intended behavior if the query runs longer than the Keep
Alive time? Should the processor start over on the query, or continue with the
result set until the last page is reached, and then start over?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description =
"application/json"),
+ @WritesAttribute(attribute = "aggregation.name", description = "The name
of the aggregation whose results are in the output flowfile"),
+ @WritesAttribute(attribute = "aggregation.number", description = "The
number of the aggregation whose results are in the output flowfile"),
+ @WritesAttribute(attribute = "page.number", description = "The number of
the page (request) in which the results were returned that are in the output
flowfile"),
+ @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page",
"consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a
paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+ "Search After/Point in Time queries must include a valid \"sort\"
field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId,
searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+ "is retained in between invocations of this processor until the
Scroll/PiT has expired " +
+ "(when the current time is later than the last query execution plus
the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"Care should be taken on the size of each page because each response " +
+ "from Elasticsearch will be loaded into memory all at once and
converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
Review comment:
What would you consider the compelling reason to have this processor in
addition to PaginatedJsonQueryElasticsearch? The differences I see are the
convenience of having it be a source processor (whereas you'd have to put a
GenerateFlowFile processor in front of PaginatedJsonQueryElasticsearch to
achieve the same thing), and there is a query expiration for
ConsumeElasticsearch. I'm just wondering if the two processors could be
combined to serve the same purpose.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractJsonQueryElasticsearch<Q extends
JsonQueryParameters> extends AbstractProcessor implements
ElasticsearchRestProcessor {
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original")
+ .description("All original flowfiles that don't cause an error to
occur go to this relationship.").build();
+ public static final Relationship REL_HITS = new
Relationship.Builder().name("hits")
+ .description("Search hits are routed to this relationship.")
+ .build();
+ public static final Relationship REL_AGGREGATIONS = new
Relationship.Builder().name("aggregations")
+ .description("Aggregations are routed to this relationship.")
+ .build();
+
+ public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
+ "splitUp-yes",
+ "Yes",
+ "Split up results."
+ );
+ public static final AllowableValue SPLIT_UP_NO = new AllowableValue(
+ "splitUp-no",
+ "No",
+ "Don't split up results."
+ );
+
+ public static final PropertyDescriptor SPLIT_UP_HITS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-hits")
+ .displayName("Split up search results")
+ .description("Split up search results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+ public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new
PropertyDescriptor.Builder()
+ .name("el-rest-split-up-aggregations")
+ .displayName("Split up aggregation results")
+ .description("Split up aggregation results into one flowfile per
result.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES)
+ .defaultValue(SPLIT_UP_NO.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ AtomicReference<ElasticSearchClientService> clientService;
+ String splitUpHits;
+ private String splitUpAggregations;
+
+ final ObjectMapper mapper = new ObjectMapper();
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_ORIGINAL);
+ rels.add(REL_FAILURE);
+ rels.add(REL_HITS);
+ rels.add(REL_AGGREGATIONS);
+ relationships = Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(QUERY);
+ descriptors.add(QUERY_ATTRIBUTE);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(CLIENT_SERVICE);
+ descriptors.add(SPLIT_UP_HITS);
+ descriptors.add(SPLIT_UP_AGGREGATIONS);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ clientService = new
AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
+
+ splitUpHits = context.getProperty(SPLIT_UP_HITS).getValue();
+ splitUpAggregations =
context.getProperty(SPLIT_UP_AGGREGATIONS).getValue();
+ }
+
+ @OnStopped
+ public void onStopped() {
+ this.clientService = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+ FlowFile input = null;
+ if (context.hasIncomingConnection()) {
+ input = session.get();
+
+ if (input == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ try {
+ final Q queryJsonParameters = buildJsonQueryParameters(input,
context, session);
+
+ List<FlowFile> hitsFlowFiles = new ArrayList<>();
+ final StopWatch stopWatch = new StopWatch(true);
+ final SearchResponse response = doQuery(queryJsonParameters,
hitsFlowFiles, session, input, stopWatch);
+
+ finishQuery(input, queryJsonParameters, session, response);
+ } catch (Exception ex) {
+ getLogger().error("Error processing flowfile.", ex);
+ if (input != null) {
+ session.transfer(input, REL_FAILURE);
+ }
+ context.yield();
+ }
+ }
+
+ abstract Q buildJsonQueryParameters(final FlowFile input, final
ProcessContext context, final ProcessSession session) throws IOException;
+
+ void populateCommonJsonQueryParameters(final Q queryJsonParameters, final
FlowFile input, final ProcessContext context,
+ final ProcessSession session)
throws IOException {
+ final String query = getQuery(input, context, session);
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+ final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+ final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
+ ?
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+ : null;
+
+ queryJsonParameters.setQuery(query);
+ queryJsonParameters.setIndex(index);
+ queryJsonParameters.setType(type);
+ queryJsonParameters.setQueryAttr(queryAttr);
+ }
+
+ abstract SearchResponse doQuery(final Q queryJsonParameters, final
List<FlowFile> hitsFlowFiles, final ProcessSession session,
+ final FlowFile input, final StopWatch
stopWatch) throws IOException;
+
+ abstract void finishQuery(final FlowFile input, final Q queryParameters,
final ProcessSession session,
+ final SearchResponse response) throws
IOException;
+
+ FlowFile createChildFlowFile(final ProcessSession session, final FlowFile
parent) {
+ return parent != null ? session.create(parent) : session.create();
+ }
+
+ private FlowFile writeAggregationFlowFileContents(final String name, final
Integer number, final String json,
+ final ProcessSession
session, final FlowFile aggFlowFile,
+ final Map<String,
String> attributes) {
+ FlowFile ff = session.write(aggFlowFile, out ->
out.write(json.getBytes()));
+ ff = session.putAllAttributes(ff, new HashMap<String, String>(){{
+ if (name != null) {
+ put("aggregation.name", name);
+ }
+ if (number != null) {
+ put("aggregation.number", number.toString());
+ }
+ }});
+
+ return session.putAllAttributes(ff, attributes);
+ }
+
+ private void handleAggregations(final Map<String, Object> aggregations,
final ProcessSession session,
+ final FlowFile parent, final Map<String,
String> attributes,
+ final String transitUri, final StopWatch
stopWatch) throws IOException {
+ if (aggregations != null && !aggregations.isEmpty()) {
+ final List<FlowFile> aggsFlowFiles = new ArrayList<>();
+ if (splitUpAggregations.equals(SPLIT_UP_YES.getValue())) {
+ int aggCount = 0;
+ for (final Map.Entry<String, Object> agg :
aggregations.entrySet()) {
+ final FlowFile aggFlowFile = createChildFlowFile(session,
parent);
+ final String aggJson =
mapper.writeValueAsString(agg.getValue());
+
aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount,
aggJson, session, aggFlowFile, attributes));
+ }
+ } else {
+ final FlowFile aggFlowFile = createChildFlowFile(session,
parent);
+ final String json = mapper.writeValueAsString(aggregations);
+ aggsFlowFiles.add(writeAggregationFlowFileContents(null, null,
json, session, aggFlowFile, attributes));
+ }
+
+ if (!aggsFlowFiles.isEmpty()) {
+ session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
+ aggsFlowFiles.forEach(ff ->
session.getProvenanceReporter().receive(ff, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
+ }
+ }
+ }
+
+ private FlowFile writeHitFlowFile(final int count, final String json,
final ProcessSession session,
+ final FlowFile hitFlowFile, final
Map<String, String> attributes) {
+ final FlowFile ff = session.write(hitFlowFile, out ->
out.write(json.getBytes()));
+ attributes.put("hit.count", Integer.toString(count));
+
+ return session.putAllAttributes(ff, attributes);
+ }
+
+ List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q
queryJsonParameters, final ProcessSession session,
+ final FlowFile parent, final Map<String, String>
attributes, final List<FlowFile> hitsFlowFiles,
+ final String transitUri, final StopWatch
stopWatch) throws IOException {
+ if (hits != null && !hits.isEmpty()) {
+ if (SPLIT_UP_YES.getValue().equals(splitUpHits)) {
+ for (final Map<String, Object> hit : hits) {
+ final FlowFile hitFlowFile = createChildFlowFile(session,
parent);
+ final String json = mapper.writeValueAsString(hit);
+ hitsFlowFiles.add(writeHitFlowFile(1, json, session,
hitFlowFile, attributes));
+ }
+ } else {
+ final FlowFile hitFlowFile = createChildFlowFile(session,
parent);
+ final String json = mapper.writeValueAsString(hits);
+ hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session,
hitFlowFile, attributes));
+ }
+ }
+
+ // output any results
+ if (!hitsFlowFiles.isEmpty()) {
Review comment:
It looks like this method will always return an empty list, then?
Perhaps some method comments to indicate the purpose of the returned
`List<FlowFile>`. Can you walk me through the logic of returning anything if
it will always be empty?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractPaginatedJsonQueryElasticsearch extends
AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
+ public static final AllowableValue SPLIT_UP_COMBINE = new AllowableValue(
+ "splitUp-combine",
+ "Combine",
+ "Combine results from all responses (one flowfile per entire
paginated result set of hits). " +
+ "Note that aggregations cannot be paged, they are
generated across the entire result set and " +
+ "returned as part of the first page. Results are output
with one JSON object per line " +
+ "(allowing hits to be combined from multiple pages without
loading all results into memory)."
+ );
+
+ public static final PropertyDescriptor SPLIT_UP_HITS = new
PropertyDescriptor.Builder()
+
.fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SPLIT_UP_HITS)
+ .description("Split up search results into one flowfile per result
or combine all hits from all pages into a single flowfile.")
+ .allowableValues(SPLIT_UP_NO, SPLIT_UP_YES, SPLIT_UP_COMBINE)
+ .build();
+
+ public static final AllowableValue PAGINATION_SEARCH_AFTER = new
AllowableValue(
+ "pagination-search_after",
+ "Search After",
+ "Use Elasticsearch \"search_after\" to page sorted results."
+ );
+ public static final AllowableValue PAGINATION_POINT_IN_TIME = new
AllowableValue(
+ "pagination-pit",
+ "Point in Time",
+ "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page
sorted results."
+ );
+ public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
+ "pagination-scroll",
+ "Scroll",
+ "Use Elasticsearch \"scroll\" to page results."
+ );
+
+ public static final PropertyDescriptor PAGINATION_TYPE = new
PropertyDescriptor.Builder()
+ .name("el-rest-pagination-type")
+ .displayName("Pagination Type")
+ .description("Pagination method to use. Not all types are
available for all Elasticsearch versions, " +
+ "check the Elasticsearch docs to confirm which are
applicable and recommended for your service.")
+ .allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER,
PAGINATION_POINT_IN_TIME)
+ .defaultValue(PAGINATION_SCROLL.getValue())
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor PAGINATION_KEEP_ALIVE = new
PropertyDescriptor.Builder()
+ .name("el-rest-pagination-keep-alive")
+ .displayName("Pagination Keep Alive")
+ .description("Pagination \"keep_alive\" period. Period
Elasticsearch will keep the scroll/pit cursor alive " +
+ "in between requests (this is not the time expected for
all pages to be returned, but the maximum " +
+ "allowed time between requests for page retrieval).")
+ .required(true)
+ .defaultValue("10 mins")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.createTimePeriodValidator(1,
TimeUnit.SECONDS, 24, TimeUnit.HOURS))
+ .build();
+
+ static final List<PropertyDescriptor> paginatedPropertyDescriptors;
+ static {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(QUERY_ATTRIBUTE);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(CLIENT_SERVICE);
+ descriptors.add(SPLIT_UP_HITS);
+ descriptors.add(SPLIT_UP_AGGREGATIONS);
+ descriptors.add(PAGINATION_TYPE);
+ descriptors.add(PAGINATION_KEEP_ALIVE);
+
+ paginatedPropertyDescriptors =
Collections.unmodifiableList(descriptors);
+ }
+
+ // output as newline delimited JSON (allows for multiple pages of results
to be appended to existing FlowFiles without retaining all hits in memory)
+ private final ObjectWriter writer =
mapper.writer().withRootValueSeparator("\n");
+
+ String paginationType;
Review comment:
Could this be protected?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
##########
@@ -1,64 +0,0 @@
-<!DOCTYPE html>
Review comment:
Is there a reason this was removed?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description =
"application/json"),
+ @WritesAttribute(attribute = "aggregation.name", description = "The name
of the aggregation whose results are in the output flowfile"),
+ @WritesAttribute(attribute = "aggregation.number", description = "The
number of the aggregation whose results are in the output flowfile"),
+ @WritesAttribute(attribute = "page.number", description = "The number of
the page (request) in which the results were returned that are in the output
flowfile"),
+ @WritesAttribute(attribute = "hit.count", description = "The number of
hits that are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page",
"consume", "json"})
+@CapabilityDescription("A processor that allows the user to repeatedly run a
paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
+ "Search After/Point in Time queries must include a valid \"sort\"
field.")
+@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId,
searchAfter, pitId, hitCount, pageCount, expirationTimestamp) " +
+ "is retained in between invocations of this processor until the
Scroll/PiT has expired " +
+ "(when the current time is later than the last query execution plus
the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"Care should be taken on the size of each page because each response " +
+ "from Elasticsearch will be loaded into memory all at once and
converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
Review comment:
A comment about naming: the name `ConsumeElasticsearch` invokes the
image of consuming a queue or topic, but this may be misleading since that is
not what would actually happen with Elasticsearch, as nothing actually
disappears (is consumed) when the results are processed. Instead, this is more
of a standing query, right?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
##########
@@ -1,48 +0,0 @@
-<!DOCTYPE html>
Review comment:
Is there a reason this was removed?
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
##########
@@ -0,0 +1,67 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>PaginatedJsonQueryElasticsearch</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css" />
+</head>
+<body>
+ <p>This processor is intended for use with the Elasticsearch JSON DSL and
Elasticsearch 5.X and newer. It is designed
+ to be able to take a query from Kibana and execute it as-is against an
Elasticsearch cluster in a paginated manner.
Review comment:
Same comment from above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]