This is an automated email from the ASF dual-hosted git repository.
chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 658f2547d8 NIFI-11430 - PaginatedJsonQueryElasticsearch processors
should not output empty FlowFile if hits have been found;
PaginatedJsonQueryElasticsearch processors should be able to use _source and
_meta only result formats when grouping by query
658f2547d8 is described below
commit 658f2547d8f3966375a629d28239ce12b9a1e333
Author: Ryan Van Den Bos <[email protected]>
AuthorDate: Wed Apr 12 10:21:57 2023 +0100
NIFI-11430 - PaginatedJsonQueryElasticsearch processors should not output
empty FlowFile if hits have been found; PaginatedJsonQueryElasticsearch
processors should be able to use _source and _meta only result formats when
grouping by query
This closes #7163
Signed-off-by: Chris Sampson <[email protected]>
---
.../AbstractJsonQueryElasticsearch.java | 2 +-
.../AbstractPaginatedJsonQueryElasticsearch.java | 9 +-
.../AbstractJsonQueryElasticsearchTest.groovy | 39 +++---
...tractPaginatedJsonQueryElasticsearchTest.groovy | 150 ++++++++++++++++-----
.../PaginatedJsonQueryElasticsearchTest.groovy | 98 ++++++++------
.../integration/AbstractElasticsearchITBase.java | 2 +-
nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml | 4 +-
7 files changed, 199 insertions(+), 105 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index 1000779143..f12409b074 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -367,7 +367,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q
extends JsonQueryParamete
}
@SuppressWarnings("unchecked")
- private List<Map<String, Object>> formatHits(final List<Map<String,
Object>> hits) {
+ List<Map<String, Object>> formatHits(final List<Map<String, Object>> hits)
{
final List<Map<String, Object>> formattedHits;
if (hitFormat == SearchResultsFormat.METADATA_ONLY) {
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index 01e5994c47..f26a5a3c52 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -75,6 +75,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch
extends AbstractJs
.build();
static final List<PropertyDescriptor> paginatedPropertyDescriptors;
+
static {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(QUERY_ATTRIBUTE);
@@ -239,7 +240,7 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
private void combineHits(final List<Map<String, Object>> hits, final
PaginatedJsonQueryParameters paginatedJsonQueryParameters,
final ProcessSession session, final FlowFile
parent,
- final Map<String, String> attributes, final
List<FlowFile> hitsFlowFiles) {
+ final Map<String, String> attributes, final
List<FlowFile> hitsFlowFiles, final boolean newQuery) {
if (hits != null && !hits.isEmpty()) {
final FlowFile hitFlowFile;
final boolean append = !hitsFlowFiles.isEmpty();
@@ -251,7 +252,7 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount()
+ hits.size(),
hits, session, hitFlowFile, attributes, append));
- } else if (isOutputNoHits()) {
+ } else if (isOutputNoHits() && newQuery) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile,
attributes));
}
@@ -271,7 +272,9 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
attributes.put("page.number",
Integer.toString(paginatedJsonQueryParameters.getPageCount()));
if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
- combineHits(hits, paginatedJsonQueryParameters, session, parent,
attributes, hitsFlowFiles);
+
+ final List<Map<String, Object>> formattedHits = formatHits(hits);
+ combineHits(formattedHits, paginatedJsonQueryParameters, session,
parent, attributes, hitsFlowFiles, newQuery);
// output results if it seems we've combined all available results
(i.e. no hits in this page and therefore no more expected)
if (!hitsFlowFiles.isEmpty() && (hits == null || hits.isEmpty())) {
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
index 670b9599b0..2eb2b91653 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
@@ -42,9 +42,8 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf
import static org.junit.jupiter.api.Assertions.assertNotNull
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.junit.jupiter.api.Assertions.assertTrue
-
abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryElasticsearch> {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
static final String INDEX_NAME = "messages"
@@ -101,8 +100,9 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
.stream().map(r -> r.getValue())
.collect(Collectors.joining(", "))
final String expectedAllowedSplitHits = processor instanceof
AbstractPaginatedJsonQueryElasticsearch
- ? ResultOutputStrategy.values().collect {r ->
r.getValue()}.join(", ")
- : nonPaginatedResultOutputStrategies
+ ? ResultOutputStrategy.values().collect { r -> r.getValue()
}.join(", ")
+ :
ResultOutputStrategy.getNonPaginatedResponseOutputStrategies().stream()
+ .map(r -> r.getValue()).collect(Collectors.joining(", "))
final AssertionError assertionError =
assertThrows(AssertionError.class, runner.&run)
assertThat(assertionError.getMessage(),
equalTo(String.format("Processor has 8 validation failures:\n" +
@@ -130,7 +130,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
void testBasicQuery() throws Exception {
// test hits (no splitting) - full hit format
final TestRunner runner = createRunner(false)
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT,
SearchResultsFormat.FULL.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
@@ -139,7 +139,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
assertOutputContent(hits.getContent(), 10, false)
final List<Map<String, Object>> result =
OBJECT_MAPPER.readValue(hits.getContent(), List.class)
result.forEach({ hit ->
- final Map<String, Object> h = ((Map<String, Object>)hit)
+ final Map<String, Object> h = ((Map<String, Object>) hit)
assertFalse(h.isEmpty())
assertTrue(h.containsKey("_source"))
assertTrue(h.containsKey("_index"))
@@ -211,14 +211,13 @@ abstract class AbstractJsonQueryElasticsearchTest<P
extends AbstractJsonQueryEla
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(0)
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS,
"false")
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 0, 0, 0)
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
- pe.getEventType() == ProvenanceEventType.RECEIVE &&
- pe.getAttribute("uuid") ==
hits.getAttribute("uuid")
+ pe.getEventType() == ProvenanceEventType.RECEIVE
}).count(),
is(0L)
)
@@ -247,8 +246,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
@Test
void testAggregations() throws Exception {
String query = prettyPrint(toJson([
- query: [ match_all: [:] ],
- aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [
terms: [ field: "msg" ] ] ]
+ query: [match_all: [:]],
+ aggs : [term_agg: [terms: [field: "msg"]], term_agg2: [terms:
[field: "msg"]]]
]))
// test aggregations (no splitting) - full aggregation format
@@ -289,7 +288,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
agg.keySet().forEach({ aggName ->
final List<Map<String, Object>> termAgg = agg.get(aggName) as
List<Map<String, Object>>
assertThat(termAgg.size(), is(5))
- termAgg.forEach({a ->
+ termAgg.forEach({ a ->
assertTrue(a.containsKey("key"))
assertTrue(a.containsKey("doc_count"))
})
@@ -321,8 +320,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
// test using Expression Language (index, type, query)
query = prettyPrint(toJson([
- query: [ match_all: [:] ],
- aggs: [ term_agg: [ terms: [ field: "\${fieldValue}" ] ],
term_agg2: [ terms: [ field: "\${fieldValue}" ] ] ]
+ query: [match_all: [:]],
+ aggs : [term_agg: [terms: [field: "\${fieldValue}"]],
term_agg2: [terms: [field: "\${fieldValue}"]]]
]))
runner.setVariable("fieldValue", "msg")
runner.setVariable("es.index", INDEX_NAME)
@@ -347,8 +346,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
@Test
void testErrorDuringSearch() throws Exception {
String query = prettyPrint(toJson([
- query: [ match_all: [:] ],
- aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [
terms: [ field: "msg" ] ] ]
+ query: [match_all: [:]],
+ aggs : [term_agg: [terms: [field: "msg"]], term_agg2: [terms:
[field: "msg"]]]
]))
final TestRunner runner = createRunner(true)
@@ -361,8 +360,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
@Test
void testQueryAttribute() throws Exception {
String query = prettyPrint(toJson([
- query: [ match_all: [:] ],
- aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [
terms: [ field: "msg" ] ] ]
+ query: [match_all: [:]],
+ aggs : [term_agg: [terms: [field: "msg"]], term_agg2: [terms:
[field: "msg"]]]
]))
final String queryAttr = "es.query"
@@ -384,7 +383,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
@Test
void testInputHandling() {
final TestRunner runner = createRunner(false)
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
runner.setIncomingConnection(true)
runner.run()
@@ -399,7 +398,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends
AbstractJsonQueryEla
@Test
void testRequestParameters() {
final TestRunner runner = createRunner(false)
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
runner.setProperty("refresh", "true")
runner.setProperty("slices", '${slices}')
runner.setVariable("slices", "auto")
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
index 0712a732ed..f1a64e3ca5 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -32,7 +32,9 @@ import static groovy.json.JsonOutput.toJson
import static org.hamcrest.CoreMatchers.equalTo
import static org.hamcrest.CoreMatchers.is
import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.jupiter.api.Assertions.assertFalse
import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.assertTrue
abstract class AbstractPaginatedJsonQueryElasticsearchTest extends
AbstractJsonQueryElasticsearchTest<AbstractPaginatedJsonQueryElasticsearch> {
abstract boolean isInput()
@@ -40,7 +42,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
@Test
void testInvalidPaginationProperties() {
final TestRunner runner = createRunner(false)
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE,
"not-a-period")
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
"not-enum")
@@ -49,7 +51,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
"'%s' validated against 'not-enum' is invalid because Given
value not found in allowed set '%s'\n" +
"'%s' validated against 'not-a-period' is invalid because Must
be of format <duration> <TimeUnit> where <duration> " +
"is a non-negative integer and TimeUnit is a supported Time
Unit, such as: nanos, millis, secs, mins, hrs, days\n",
-
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(),
PaginationType.values().collect {p -> p.getValue()}.join(", "),
+
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(),
PaginationType.values().collect { p -> p.getValue() }.join(", "),
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName(),
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName()
)))
@@ -59,7 +61,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
void testSinglePage() {
// paged query hits (no splitting)
final TestRunner runner = createRunner(false)
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
MockFlowFile input = runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
FlowFile hits =
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
@@ -117,13 +119,104 @@ abstract class
AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
assertSendEvent(runner, input)
}
+ static void assertFormattedResult(final SearchResultsFormat
searchResultsFormat, final Map<String, Object> hit) {
+ assertFalse(hit.isEmpty())
+ switch(searchResultsFormat) {
+ case SearchResultsFormat.SOURCE_ONLY:
+ assertFalse(hit.containsKey("_source"))
+ assertFalse(hit.containsKey("_index"))
+ break
+ case SearchResultsFormat.METADATA_ONLY:
+ assertFalse(hit.containsKey("_source"))
+ assertTrue(hit.containsKey("_index"))
+ break
+ case SearchResultsFormat.FULL:
+ assertTrue(hit.containsKey("_source"))
+ assertTrue(hit.containsKey("_index"))
+ break
+ default:
+ throw new IllegalArgumentException("Unknown
SearchResultsFormat value: " + searchResultsFormat.toString())
+ }
+ }
+
+ private void assertResultsFormat(final TestRunner runner, final
ResultOutputStrategy resultOutputStrategy, final SearchResultsFormat
searchResultsFormat) {
+ int flowFileCount
+ String hitsCount
+ boolean ndjson = false
+
+ switch (resultOutputStrategy) {
+ case ResultOutputStrategy.PER_QUERY:
+ flowFileCount = 1
+ hitsCount = "10"
+ ndjson = true
+ break
+ case ResultOutputStrategy.PER_HIT:
+ flowFileCount = 10
+ hitsCount = "1"
+ break
+ case ResultOutputStrategy.PER_RESPONSE:
+ flowFileCount = 1
+ hitsCount = "10"
+ break
+ default:
+ throw new IllegalArgumentException("Unknown
ResultOutputStrategy value: " + resultOutputStrategy.toString())
+ }
+
+ // Test Relationship counts
+ testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
+
+ // Per response outputs an array of values
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({
hit ->
+ hit.assertAttributeEquals("hit.count", hitsCount)
+ assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+ if (ResultOutputStrategy.PER_RESPONSE == resultOutputStrategy) {
+ OBJECT_MAPPER.readValue(hit.getContent(),
ArrayList.class).forEach(h -> {
+ assertFormattedResult(searchResultsFormat, h as
Map<String, Object>)
+ })
+ } else {
+ final Map<String, Object> h =
OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+ assertFormattedResult(searchResultsFormat, h)
+ }
+ assertThat(
+ runner.getProvenanceEvents().stream().filter({ pe ->
+ pe.getEventType() == ProvenanceEventType.RECEIVE &&
+ pe.getAttribute("uuid") ==
hit.getAttribute("uuid")
+ }).count(),
+ is(1L)
+ )
+ })
+ }
+
+ @Test
+ void testResultsFormat() throws Exception {
+ for (final ResultOutputStrategy resultOutputStrategy :
ResultOutputStrategy.values()) {
+ final TestRunner runner = createRunner(false)
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order:
"asc"]]]])))
+
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
resultOutputStrategy.getValue())
+
+ // Test against each results format
+ for (final SearchResultsFormat searchResultsFormat :
SearchResultsFormat.values()) {
+
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT,
searchResultsFormat.getValue())
+
+ // Test against each pagination type
+ for (final PaginationType paginationType :
PaginationType.values()) {
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
paginationType.getValue())
+
+ runOnce(runner)
+ assertResultsFormat(runner, resultOutputStrategy,
searchResultsFormat)
+ reset(runner)
+ }
+ }
+ }
+ }
+
@Test
void testScrollError() {
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setThrowErrorInDelete(true)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
PaginationType.SCROLL.getValue())
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]])))
// still expect "success" output for exception during final clean-up
runMultiple(runner, 2)
@@ -147,7 +240,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT,
SearchResultsFormat.FULL.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT,
AggregationResultsFormat.FULL.getValue())
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
PaginationType.POINT_IN_TIME.getValue())
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]])))
// still expect "success" output for exception during final clean-up
runMultiple(runner, 2)
@@ -169,7 +262,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
final TestElasticsearchClientService service = getService(runner)
service.setThrowErrorInPit(true)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
PaginationType.POINT_IN_TIME.getValue())
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]])))
// expect "failure" output for exception during query setup
runOnce(runner)
@@ -190,7 +283,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest
extends AbstractJsonQ
// test PiT without sort
final TestRunner runner = createRunner(false)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
PaginationType.POINT_IN_TIME.getValue())
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [ match_all: [:] ]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
// expect "failure" output for exception during query setup
runOnce(runner)
@@ -265,46 +358,35 @@ abstract class
AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
is(1L)
)
} else {
- assertThat(runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
+ assertThat(runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.SEND }).count(), is(0L))
}
}
@Test
- void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
+ void testEmptyHitsFlowFileIsProducedForEachResultSplitSetup() {
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]]])))
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order:
"asc"]]]])))
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS,
"true")
service.setMaxPages(0)
- // test that an empty flow file is produced for a per query setup
-
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
ResultOutputStrategy.PER_QUERY.getValue())
- runOnce(runner)
- testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"0")
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"1")
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
== 0
- reset(runner)
+ for (final PaginationType paginationType : PaginationType.values()) {
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
paginationType.getValue())
- // test that an empty flow file is produced for a per hit setup
-
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
ResultOutputStrategy.PER_HIT.getValue())
- runOnce(runner)
- testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+ for (final ResultOutputStrategy resultOutputStrategy :
ResultOutputStrategy.values()) {
+ // test that an empty flow file is produced for a per query
setup
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
resultOutputStrategy.getValue())
+ runOnce(runner)
+ testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"0")
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"1")
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
== 0
- reset(runner)
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"0")
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"1")
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
== 0
+ reset(runner)
+ }
+ }
- // test that an empty flow file is produced for a per response setup
-
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
ResultOutputStrategy.PER_RESPONSE.getValue())
- runOnce(runner)
- testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"0")
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"1")
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
== 0
- reset(runner)
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
index 992b548b94..30c6eea6a3 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.elasticsearch
-
import org.apache.nifi.processors.elasticsearch.api.PaginationType
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.util.TestRunner
@@ -40,54 +39,65 @@ class PaginatedJsonQueryElasticsearchTest extends
AbstractPaginatedJsonQueryElas
return true
}
+ static void validatePagination(final TestRunner runner, final
ResultOutputStrategy resultOutputStrategy) {
+ switch (resultOutputStrategy) {
+ case ResultOutputStrategy.PER_RESPONSE:
+ testCounts(runner, 1, 2, 0, 0)
+ int page = 1
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+ { hit ->
+ hit.assertAttributeEquals("hit.count", "10")
+ hit.assertAttributeEquals("page.number",
Integer.toString(page++))
+ }
+ )
+ break
+ case ResultOutputStrategy.PER_QUERY:
+ testCounts(runner, 1, 1, 0, 0)
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"20")
+ // the "last" page.number is used, so 2 here because there
were 2 pages of hits
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"2")
+ assertThat(
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length,
+ is(20)
+ )
+ break
+ case ResultOutputStrategy.PER_HIT:
+ testCounts(runner, 1, 20, 0, 0)
+ int count = 0
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+ { hit ->
+ hit.assertAttributeEquals("hit.count", "1")
+ // 10 hits per page, so first 10 flow files should
be page.number 1, the rest page.number 2
+ hit.assertAttributeEquals("page.number",
Integer.toString(Math.ceil(++count / 10) as int))
+ }
+ )
+ break
+ default:
+ throw new IllegalArgumentException("Unknown
ResultOutputStrategy value: " + resultOutputStrategy)
+ }
+ }
+
void testPagination(final PaginationType paginationType) {
- // test flowfile per page
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
-
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
paginationType.getValue())
- runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:]
]])))
-
- runOnce(runner)
- testCounts(runner, 1, 2, 0, 0)
- int page = 1
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
- { hit ->
- hit.assertAttributeEquals("hit.count", "10")
- hit.assertAttributeEquals("page.number",
Integer.toString(page++))
- }
- )
- runner.getStateManager().assertStateNotSet()
- reset(runner)
-
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
prettyPrint(toJson([size: 10, sort: [msg: "desc"], query: [match_all: [:]]])))
- // test hits splitting
-
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
ResultOutputStrategy.PER_HIT.getValue())
- runOnce(runner)
- testCounts(runner, 1, 20, 0, 0)
- int count = 0
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
- { hit ->
- hit.assertAttributeEquals("hit.count", "1")
- // 10 hits per page, so first 10 flowfiles should be
page.number 1, the rest page.number 2
- hit.assertAttributeEquals("page.number",
Integer.toString(Math.ceil(++count / 10) as int))
- }
- )
- runner.getStateManager().assertStateNotSet()
- reset(runner)
+ for (final ResultOutputStrategy resultOutputStrategy :
ResultOutputStrategy.values()) {
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
resultOutputStrategy.getValue())
+ runOnce(runner)
+ validatePagination(runner, resultOutputStrategy)
+ runner.getStateManager().assertStateNotSet()
+ reset(runner)
- // test hits combined
-
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
ResultOutputStrategy.PER_QUERY.getValue())
- runOnce(runner)
- testCounts(runner, 1, 1, 0, 0)
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
"20")
- // the "last" page.number is used, so 2 here because there were 2
pages of hits
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
"2")
- assertThat(
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length,
- is(20)
- )
- runner.getStateManager().assertStateNotSet()
+ // Check that OUTPUT_NO_HITS true doesn't have any adverse effects
on pagination
+ runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS,
"true")
+ runOnce(runner)
+ validatePagination(runner, resultOutputStrategy)
+ // Unset OUTPUT_NO_HITS
+ runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS,
"false")
+ reset(runner)
+ }
}
-}
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 5c2b56ba94..3b23e3c729 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -51,7 +51,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
- .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.7.0"));
+ .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.7.1"));
protected static final String ELASTIC_USER_PASSWORD =
System.getProperty("elasticsearch.elastic_user.password",
RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
new ElasticsearchContainer(IMAGE)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index 5364ec52a1..571a1fbb3d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -101,7 +101,7 @@ language governing permissions and limitations under the
License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in
nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
- <elasticsearch_docker_image>8.7.0</elasticsearch_docker_image>
+ <elasticsearch_docker_image>8.7.1</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>
@@ -132,7 +132,7 @@ language governing permissions and limitations under the
License. -->
<profile>
<id>elasticsearch7</id>
<properties>
- <elasticsearch_docker_image>7.17.9</elasticsearch_docker_image>
+
<elasticsearch_docker_image>7.17.10</elasticsearch_docker_image>
</properties>
</profile>
</profiles>