This is an automated email from the ASF dual-hosted git repository.
chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2ad33eea80 NIFI-10845 - JsonQueryElasticsearch processors are not
outputting an empty flow file for a combined response with output_no_hits set
to true
2ad33eea80 is described below
commit 2ad33eea8002db00b5bf80edc7b2dc30cb3d1557
Author: Ryan Van Den Bos <[email protected]>
AuthorDate: Tue Nov 22 09:41:44 2022 +0000
NIFI-10845 - JsonQueryElasticsearch processors are not outputting an empty
flow file for a combined response with output_no_hits set to true
Signed-off-by: Chris Sampson <[email protected]>
This closes #6701
---
.../AbstractJsonQueryElasticsearch.java | 8 +++--
.../AbstractPaginatedJsonQueryElasticsearch.java | 3 ++
...tractPaginatedJsonQueryElasticsearchTest.groovy | 39 ++++++++++++++++++++++
3 files changed, 48 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index 1ce0f7f414..f154cf2790 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -105,6 +105,10 @@ public abstract class AbstractJsonQueryElasticsearch<Q
extends JsonQueryParamete
private String splitUpAggregations;
private boolean outputNoHits;
+ boolean getOutputNoHits() {
+ return outputNoHits;
+ }
+
final ObjectMapper mapper = new ObjectMapper();
final AtomicReference<ElasticSearchClientService> clientService = new
AtomicReference<>(null);
@@ -277,8 +281,8 @@ public abstract class AbstractJsonQueryElasticsearch<Q
extends JsonQueryParamete
}
}
- private FlowFile writeHitFlowFile(final int count, final String json,
final ProcessSession session,
- final FlowFile hitFlowFile, final
Map<String, String> attributes) {
+ FlowFile writeHitFlowFile(final int count, final String json, final
ProcessSession session,
+ final FlowFile hitFlowFile, final Map<String,
String> attributes) {
final FlowFile ff = session.write(hitFlowFile, out ->
out.write(json.getBytes()));
attributes.put("hit.count", Integer.toString(count));
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index 2e8eab6eeb..ec1a020ad7 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -273,6 +273,9 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount()
+ hits.size(),
hits, session, hitFlowFile, attributes, append));
+ } else if (getOutputNoHits()) {
+ final FlowFile hitFlowFile = createChildFlowFile(session, parent);
+ hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile,
attributes));
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
index 274e6a2fd5..0ec4470f76 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -268,4 +268,43 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
assertThat(runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
}
}
+
+ @Test
+ void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
+ final TestRunner runner = createRunner(false)
+ final TestElasticsearchClientService service = getService(runner)
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS,
"true")
+ service.setMaxPages(0)
+
+ // test that an empty flow file is produced for a per query setup
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
+ runOnce(runner)
+ testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"0")
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"1")
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
== 0
+ reset(runner)
+
+ // test that an empty flow file is produced for a per hit setup
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_HIT)
+ runOnce(runner)
+ testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"0")
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"1")
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
== 0
+ reset(runner)
+
+ // test that an empty flow file is produced for a per response setup
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE)
+ runOnce(runner)
+ testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"0")
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"1")
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
== 0
+ reset(runner)
+ }
}