This is an automated email from the ASF dual-hosted git repository.
chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e0161f283f [NIFI-10304] [NIFI-14351] Added a config item to
SearchElasticsearch processor to control if the processor restarts on query
finish. Also removed query expiration for SEARCH_AFTER pagination type
e0161f283f is described below
commit e0161f283fc4a6a5c07cee7119288a983a052506
Author: Vijay Gorla <[email protected]>
AuthorDate: Sun Apr 6 10:01:54 2025 +1000
[NIFI-10304] [NIFI-14351] Added a config item to SearchElasticsearch
processor to control if the processor restarts on query finish. Also removed
query expiration for SEARCH_AFTER pagination type
Signed-off-by: Chris Sampson <[email protected]>
This closes #9806.
---
.../AbstractJsonQueryElasticsearch.java | 4 +
.../AbstractPaginatedJsonQueryElasticsearch.java | 26 +--
.../elasticsearch/ConsumeElasticsearch.java | 2 +-
.../PaginatedJsonQueryElasticsearch.java | 6 +-
.../elasticsearch/SearchElasticsearch.java | 69 ++++++--
.../api/PaginatedJsonQueryParameters.java | 11 +-
.../elasticsearch/api/PaginationType.java | 14 +-
...bstractPaginatedJsonQueryElasticsearchTest.java | 28 ++-
.../elasticsearch/ConsumeElasticsearchTest.java | 2 +
.../elasticsearch/SearchElasticsearchTest.java | 189 ++++++++++++++-------
10 files changed, 248 insertions(+), 103 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index bd7243b038..65f2f6876e 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -205,6 +205,10 @@ public abstract class AbstractJsonQueryElasticsearch<Q
extends JsonQueryParamete
try {
final Q queryJsonParameters = buildJsonQueryParameters(input,
context, session);
+ if (queryJsonParameters == null) {
+ context.yield();
+ return;
+ }
List<FlowFile> hitsFlowFiles = new ArrayList<>();
final StopWatch stopWatch = new StopWatch(true);
final SearchResponse response = doQuery(queryJsonParameters,
hitsFlowFiles, session, context, input, stopWatch);
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index d479e77c42..ec881b65cc 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -69,6 +69,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch
extends AbstractJs
"in between requests (this is not the time expected for
all pages to be returned, but the maximum " +
"allowed time for requests between page retrievals).")
.required(true)
+ .dependsOn(PAGINATION_TYPE, PaginationType.SCROLL,
PaginationType.POINT_IN_TIME)
.defaultValue("10 mins")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.createTimePeriodValidator(1,
TimeUnit.SECONDS, 24, TimeUnit.HOURS))
@@ -100,9 +101,9 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
final StopWatch stopWatch) throws IOException {
SearchResponse response = null;
do {
- // check any previously started query hasn't expired
- final boolean expiredQuery =
isExpired(paginatedJsonQueryParameters, context, response);
- final boolean newQuery =
StringUtils.isBlank(paginatedJsonQueryParameters.getPageExpirationTimestamp())
|| expiredQuery;
+ // reset query params if query needs to be restarted from the
beginning
+ this.resetQueryParamsIfRequired(paginatedJsonQueryParameters,
context);
+ final boolean newQuery =
paginatedJsonQueryParameters.getPageCount() == 0;
// execute query/scroll
final String queryJson = updateQueryJson(newQuery,
paginatedJsonQueryParameters, context, input);
@@ -155,13 +156,14 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
final PaginatedJsonQueryParameters paginatedJsonQueryParameters = new
PaginatedJsonQueryParameters();
populateCommonJsonQueryParameters(paginatedJsonQueryParameters, input,
context, session);
-
paginatedJsonQueryParameters.setKeepAlive(context.getProperty(PAGINATION_KEEP_ALIVE).asTimePeriod(TimeUnit.SECONDS)
+ "s");
+ if (this.paginationType.hasExpiry()) {
+
paginatedJsonQueryParameters.setKeepAlive(context.getProperty(PAGINATION_KEEP_ALIVE).asTimePeriod(TimeUnit.SECONDS)
+ "s");
+ }
return paginatedJsonQueryParameters;
}
- abstract boolean isExpired(final PaginatedJsonQueryParameters
paginatedQueryParameters, final ProcessContext context,
- final SearchResponse response) throws
IOException;
+ abstract void resetQueryParamsIfRequired(final
PaginatedJsonQueryParameters paginatedQueryParameters, final ProcessContext
context) throws IOException;
abstract String getScrollId(final ProcessContext context, final
SearchResponse response) throws IOException;
@@ -283,12 +285,14 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
void updateQueryParameters(final PaginatedJsonQueryParameters
paginatedJsonQueryParameters, final SearchResponse response) {
paginatedJsonQueryParameters.incrementPageCount();
+ paginatedJsonQueryParameters.setFinished(response.getHits().isEmpty());
- // mark the paginated query for expiry if there are no hits (no more
pages to obtain so stop looping on this query)
- final String keepAliveDuration = "PT" + (!response.getHits().isEmpty()
? paginatedJsonQueryParameters.getKeepAlive() : "0s");
- paginatedJsonQueryParameters.setPageExpirationTimestamp(
-
String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli())
- );
+ if (this.paginationType.hasExpiry()) {
+ final String keepAliveDuration = "PT" +
paginatedJsonQueryParameters.getKeepAlive();
+ paginatedJsonQueryParameters.setPageExpirationTimestamp(
+
String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli())
+ );
+ }
}
void clearElasticsearchState(final ProcessContext context, final
SearchResponse response, final FlowFile input) {
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
index 7c6a86e099..0817c6c8c3 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
@@ -188,7 +188,7 @@ public class ConsumeElasticsearch extends
SearchElasticsearch {
private static final List<PropertyDescriptor> propertyDescriptors =
Stream.concat(
Stream.of(RANGE_FIELD, RANGE_FIELD_SORT_ORDER,
RANGE_INITIAL_VALUE, RANGE_DATE_FORMAT, RANGE_TIME_ZONE, ADDITIONAL_FILTERS),
scrollPropertyDescriptors.stream()
- .filter(pd -> !QUERY.equals(pd) &&
!QUERY_CLAUSE.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd))
+ .filter(pd -> !QUERY.equals(pd) &&
!QUERY_CLAUSE.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd) &&
!RESTART_ON_FINISH.equals(pd))
.map(property -> {
if (property == ElasticsearchRestProcessor.SIZE)
return SIZE;
if (property ==
ElasticsearchRestProcessor.AGGREGATIONS) return AGGREGATIONS;
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
index 787bacf966..2df02b5f11 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
@@ -82,10 +82,8 @@ public class PaginatedJsonQueryElasticsearch extends
AbstractPaginatedJsonQueryE
}
@Override
- boolean isExpired(final PaginatedJsonQueryParameters
paginatedQueryJsonParameters, final ProcessContext context,
- final SearchResponse response) {
- // queries using input FlowFiles don't expire, they run until
completion
- return false;
+ void resetQueryParamsIfRequired(final PaginatedJsonQueryParameters
paginatedQueryJsonParameters, final ProcessContext context) {
+ // Noting to reset. Queries using input FlowFiles don't expire, they
run until completion
}
@Override
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
index 7cda87f9f6..684b2d975e 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
@@ -30,6 +30,7 @@ import
org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
@@ -39,6 +40,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
import
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
import org.apache.nifi.processors.elasticsearch.api.PaginationType;
import org.apache.nifi.util.StringUtils;
@@ -98,19 +100,31 @@ public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch
static final String STATE_PAGE_EXPIRATION_TIMESTAMP =
"pageExpirationTimestamp";
static final String STATE_PAGE_COUNT = "pageCount";
static final String STATE_HIT_COUNT = "hitCount";
+ static final String STATE_FINISHED = "finished";
static final PropertyDescriptor QUERY = new
PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.QUERY)
.description("A query in JSON syntax, not Lucene syntax. Ex:
{\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}. " +
"If the query is empty, a default JSON Object will be
used, which will result in a \"match_all\" query in Elasticsearch.")
.build();
- private static final Set<Relationship> relationships = Set.of(REL_HITS,
REL_AGGREGATIONS);
+ static final PropertyDescriptor RESTART_ON_FINISH = new
PropertyDescriptor.Builder()
+ .name("restart-on-finish")
+ .displayName("Restart On Finish?")
+ .description("Whether the processor should start another search
with the same query once a paginated search has completed.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+ .defaultValue(Boolean.TRUE.toString())
+ .required(true)
+ .build();
+
+ private static final Set<Relationship> relationships = Set.of(REL_FAILURE,
REL_RETRY, REL_HITS, REL_AGGREGATIONS);
static final List<PropertyDescriptor> scrollPropertyDescriptors =
Stream.concat(
Stream.of(
// ensure QUERY_DEFINITION_STYLE first for consistency
between Elasticsearch processors
QUERY_DEFINITION_STYLE,
- QUERY
+ QUERY,
+ RESTART_ON_FINISH
),
paginatedPropertyDescriptors.stream().filter(
// override QUERY to change description (no FlowFile
content used by SearchElasticsearch)
@@ -118,6 +132,8 @@ public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch
)
).toList();
+ boolean restartOnFinish = true;
+
@Override
public Set<Relationship> getRelationships() {
return relationships;
@@ -132,17 +148,34 @@ public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch
return Scope.LOCAL;
}
+ @Override
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
+ if (context.getProperty(RESTART_ON_FINISH).isSet()) {
+ this.restartOnFinish =
context.getProperty(RESTART_ON_FINISH).asBoolean();
+ }
+ }
+
@Override
PaginatedJsonQueryParameters buildJsonQueryParameters(final FlowFile
input, final ProcessContext context, final ProcessSession session) throws
IOException {
+ final StateMap stateMap =
context.getStateManager().getState(getStateScope());
+
+ final boolean finished = stateMap.get(STATE_FINISHED) != null &&
Boolean.parseBoolean(stateMap.get(STATE_FINISHED));
+
+ if (finished && !this.restartOnFinish) {
+ return null;
+ }
+
final PaginatedJsonQueryParameters paginatedQueryJsonParameters =
super.buildJsonQueryParameters(input, context, session);
- final StateMap stateMap =
context.getStateManager().getState(getStateScope());
paginatedQueryJsonParameters.setHitCount(stateMap.get(STATE_HIT_COUNT)
== null ? 0 : Integer.parseInt(stateMap.get(STATE_HIT_COUNT)));
paginatedQueryJsonParameters.setPageCount(stateMap.get(STATE_PAGE_COUNT) ==
null ? 0 : Integer.parseInt(stateMap.get(STATE_PAGE_COUNT)));
paginatedQueryJsonParameters.setScrollId(stateMap.get(STATE_SCROLL_ID));
paginatedQueryJsonParameters.setSearchAfter(stateMap.get(STATE_SEARCH_AFTER));
paginatedQueryJsonParameters.setPitId(stateMap.get(STATE_PIT_ID));
paginatedQueryJsonParameters.setPageExpirationTimestamp(stateMap.get(STATE_PAGE_EXPIRATION_TIMESTAMP));
+ paginatedQueryJsonParameters.setFinished(finished);
return paginatedQueryJsonParameters;
}
@@ -153,11 +186,9 @@ public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch
final Map<String, String> newStateMap = new HashMap<>(10, 1);
additionalState(newStateMap, paginatedJsonQueryParameters);
- if (response.getHits().isEmpty()) {
- getLogger().debug("No more results for paginated query, resetting
state for future queries");
- } else {
- getLogger().debug("Updating state for next execution");
+ getLogger().debug("Updating state for next execution");
+ if (!paginatedJsonQueryParameters.isFinished()) {
if (paginationType == PaginationType.SCROLL) {
newStateMap.put(STATE_SCROLL_ID, response.getScrollId());
} else {
@@ -167,10 +198,14 @@ public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch
newStateMap.put(STATE_PIT_ID, response.getPitId());
}
}
- newStateMap.put(STATE_HIT_COUNT,
Integer.toString(paginatedJsonQueryParameters.getHitCount()));
- newStateMap.put(STATE_PAGE_COUNT,
Integer.toString(paginatedJsonQueryParameters.getPageCount()));
- newStateMap.put(STATE_PAGE_EXPIRATION_TIMESTAMP,
paginatedJsonQueryParameters.getPageExpirationTimestamp());
+ if (paginatedJsonQueryParameters.getPageExpirationTimestamp() !=
null) {
+ newStateMap.put(STATE_PAGE_EXPIRATION_TIMESTAMP,
paginatedJsonQueryParameters.getPageExpirationTimestamp());
+ }
}
+ newStateMap.put(STATE_HIT_COUNT,
Integer.toString(paginatedJsonQueryParameters.getHitCount()));
+ newStateMap.put(STATE_PAGE_COUNT,
Integer.toString(paginatedJsonQueryParameters.getPageCount()));
+ newStateMap.put(STATE_FINISHED,
Boolean.toString(paginatedJsonQueryParameters.isFinished()));
+
updateProcessorState(context, newStateMap);
}
@@ -179,13 +214,13 @@ public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch
}
@Override
- boolean isExpired(final PaginatedJsonQueryParameters
paginatedJsonQueryParameters, final ProcessContext context,
- final SearchResponse response) throws IOException {
- final boolean expiredQuery =
StringUtils.isNotEmpty(paginatedJsonQueryParameters.getPageExpirationTimestamp())
+ void resetQueryParamsIfRequired(final PaginatedJsonQueryParameters
paginatedJsonQueryParameters, final ProcessContext context) throws IOException {
+ final boolean expiredQuery = this.paginationType.hasExpiry() &&
StringUtils.isNotEmpty(paginatedJsonQueryParameters.getPageExpirationTimestamp())
&&
Instant.ofEpochMilli(Long.parseLong(paginatedJsonQueryParameters.getPageExpirationTimestamp())).isBefore(Instant.now());
+ final boolean restaredQuery =
paginatedJsonQueryParameters.isFinished() && this.restartOnFinish;
- if (expiredQuery) {
- getLogger().debug("Existing paginated query has expired, resetting
for new query");
+ if (expiredQuery || restaredQuery) {
+ getLogger().debug("Existing paginated query has expired or
restarted, resetting for new query");
final Map<String, String> newStateMap = new HashMap<>(1, 1);
additionalState(newStateMap, paginatedJsonQueryParameters);
@@ -197,8 +232,8 @@ public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch
paginatedJsonQueryParameters.setPitId(null);
paginatedJsonQueryParameters.setScrollId(null);
paginatedJsonQueryParameters.setSearchAfter(null);
+ paginatedJsonQueryParameters.setFinished(false);
}
- return expiredQuery;
}
@Override
@@ -224,4 +259,4 @@ public class SearchElasticsearch extends
AbstractPaginatedJsonQueryElasticsearch
context.getStateManager().setState(newStateMap, getStateScope());
}
}
-}
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
index 68c34fae0f..4a268dde4b 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
@@ -25,6 +25,7 @@ public class PaginatedJsonQueryParameters extends
JsonQueryParameters {
private String pageExpirationTimestamp = null;
private String keepAlive;
private String trackingRangeValue;
+ private boolean finished = false;
public int getPageCount() {
return pageCount;
@@ -85,4 +86,12 @@ public class PaginatedJsonQueryParameters extends
JsonQueryParameters {
public void setTrackingRangeValue(String trackingRangeValue) {
this.trackingRangeValue = trackingRangeValue;
}
-}
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(boolean finished) {
+ this.finished = finished;
+ }
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
index c8ea776309..47e84a5e20 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
@@ -20,17 +20,19 @@ package org.apache.nifi.processors.elasticsearch.api;
import org.apache.nifi.components.DescribedValue;
public enum PaginationType implements DescribedValue {
- SCROLL("pagination-scroll", "Use Elasticsearch \"_scroll\" API to page
results. Does not accept additional query parameters."),
- SEARCH_AFTER("pagination-search_after", "Use Elasticsearch
\"search_after\" _search API to page sorted results."),
+ SCROLL("pagination-scroll", "Use Elasticsearch \"_scroll\" API to page
results. Does not accept additional query parameters.", true),
+ SEARCH_AFTER("pagination-search_after", "Use Elasticsearch
\"search_after\" _search API to page sorted results.", false),
POINT_IN_TIME("pagination-pit", "Use Elasticsearch (7.10+ with XPack)
\"point in time\" _search API to page sorted results. " +
- "Not available for use with AWS OpenSearch.");
+ "Not available for use with AWS OpenSearch.", true);
private final String value;
private final String description;
+ private final boolean hasExpiry;
- PaginationType(final String value, final String description) {
+ PaginationType(final String value, final String description, final boolean
hasExpiry) {
this.value = value;
this.description = description;
+ this.hasExpiry = hasExpiry;
}
@Override
@@ -47,4 +49,8 @@ public enum PaginationType implements DescribedValue {
public String getDescription() {
return description;
}
+
+ public boolean hasExpiry() {
+ return this.hasExpiry;
+ }
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java
index e47aa80465..4ba1409790 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java
@@ -54,18 +54,15 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
}
@Test
- void testInvalidPaginationProperties() {
+ void testInvalidPaginationTypeProperty() {
final TestRunner runner = createRunner(false);
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
matchAllQuery);
-
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE,
"not-a-period");
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
"not-enum");
final AssertionError assertionError =
assertThrows(AssertionError.class, runner::run);
final String expected = String.format("""
- Processor has 2 validation failures:
+ Processor has 1 validation failures:
'%s' validated against 'not-enum' is invalid because
Given value not found in allowed set '%s'
- '%s' validated against 'not-a-period' is invalid
because Must be of format <duration> <TimeUnit> where <duration> \
- is a non-negative integer and TimeUnit is a supported
Time Unit, such as: nanos, millis, secs, mins, hrs, days
""",
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(),
Stream.of(PaginationType.values()).map(PaginationType::getValue).collect(Collectors.joining(",
")),
@@ -73,6 +70,27 @@ public abstract class
AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
assertEquals(expected, assertionError.getMessage());
}
+ @ParameterizedTest
+ @EnumSource(PaginationType.class)
+ void testInvalidPaginationKeepAliveProperty(final PaginationType
paginationType) {
+ final TestRunner runner = createRunner(false);
+ runner.setProperty(AbstractJsonQueryElasticsearch.QUERY,
matchAllQuery);
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE,
"not-a-period");
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
paginationType.getValue());
+
+ if (paginationType.hasExpiry()) {
+ runner.assertNotValid();
+ final AssertionError assertionError =
assertThrows(AssertionError.class, runner::run);
+ final String expected = String.format("Processor has 1 validation
failures:\n" +
+ "'%s' validated against 'not-a-period' is invalid because
Must be of format <duration> <TimeUnit> where <duration> " +
+ "is a non-negative integer and TimeUnit is a supported
Time Unit, such as: nanos, millis, secs, mins, hrs, days\n",
+
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName());
+ assertEquals(expected, assertionError.getMessage());
+ } else {
+ runner.assertValid();
+ }
+ }
+
@Test
void testSinglePage() {
// paged query hits (no splitting)
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearchTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearchTest.java
index 3f85803488..0a9b72fe2d 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearchTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearchTest.java
@@ -23,6 +23,7 @@ import com.jayway.jsonpath.JsonPath;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.elasticsearch.SearchResponse;
import
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.processors.elasticsearch.api.PaginationType;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.BeforeAll;
@@ -151,6 +152,7 @@ public class ConsumeElasticsearchTest extends
SearchElasticsearchTest {
paginatedJsonQueryParameters.setPageCount(pageCount);
paginatedJsonQueryParameters.setTrackingRangeValue(defaultUnset);
+ ((ConsumeElasticsearch) runner.getProcessor()).paginationType =
PaginationType.SCROLL;
((ConsumeElasticsearch)
runner.getProcessor()).updateQueryParameters(paginatedJsonQueryParameters,
response);
if ("asc".equals(sortOrder)) {
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.java
index 4075057e19..6668e87b3b 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.java
@@ -67,47 +67,66 @@ public class SearchElasticsearchTest extends
AbstractPaginatedJsonQueryElasticse
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
"10");
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
"1");
- assertState(runner, paginationType, 10, 1);
+ assertState(runner, paginationType, 10, 1, false);
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
assertFalse(getService(runner).getQuery().contains("\"five\""));
}
- // wait for expiration
- final Instant expiration =
Instant.ofEpochMilli(Long.parseLong(runner.getStateManager().getState(getStateScope()).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP)));
- while (expiration.isAfter(Instant.now())) {
- Thread.sleep(10);
- }
+ if (paginationType == PaginationType.SEARCH_AFTER) {
+ Thread.sleep(2000); // Slightly longer than PAGINATION_KEEP_ALIVE
of 1 sec
- if ("true".equalsIgnoreCase(System.getenv("CI"))) {
- // allow extra time if running in CI Pipeline to prevent
intermittent timing-issue failures
- Thread.sleep(1000);
- }
+ runner.clearTransferState();
- service.resetPageCount();
- runner.clearTransferState();
+ // does not expire
+ runOnce(runner);
+ AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
"10");
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
"2");
+ assertState(runner, paginationType, 20, 2, false);
+ if (runner.getProcessor() instanceof ConsumeElasticsearch) {
+ // trackingRangeValue should be retained after previous query
expiry
+ assertTrue(getService(runner).getQuery().contains("\"five\""));
+ }
+ runner.clearTransferState();
- // first page again (new query after first query expired)
- runOnce(runner);
- AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
"10");
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
"1");
- assertState(runner, paginationType, 10, 1);
- if (runner.getProcessor() instanceof ConsumeElasticsearch) {
- // trackingRangeValue should be retained after previous query
expiry
- assertTrue(getService(runner).getQuery().contains("\"five\""));
- }
- runner.clearTransferState();
+ } else {
+ // wait for expiration
+ final Instant expiration =
Instant.ofEpochMilli(Long.parseLong(runner.getStateManager().getState(getStateScope()).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP)));
+ while (expiration.isAfter(Instant.now())) {
+ Thread.sleep(10);
+ }
- // second page
- runOnce(runner);
- AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
"10");
-
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
"2");
- assertState(runner, paginationType, 20, 2);
- if (runner.getProcessor() instanceof ConsumeElasticsearch) {
- assertTrue(getService(runner).getQuery().contains("\"five\""));
+ if ("true".equalsIgnoreCase(System.getenv("CI"))) {
+ // allow extra time if running in CI Pipeline to prevent
intermittent timing-issue failures
+ Thread.sleep(1000);
+ }
+
+ service.resetPageCount();
+ runner.clearTransferState();
+
+ // first page again (new query after first query expired)
+ runOnce(runner);
+ AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
"10");
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
"1");
+ assertState(runner, paginationType, 10, 1, false);
+ if (runner.getProcessor() instanceof ConsumeElasticsearch) {
+ // trackingRangeValue should be retained after previous query
expiry
+ assertTrue(getService(runner).getQuery().contains("\"five\""));
+ }
+ runner.clearTransferState();
+
+ // second page
+ runOnce(runner);
+ AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
"10");
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
"2");
+ assertState(runner, paginationType, 20, 2, false);
+ if (runner.getProcessor() instanceof ConsumeElasticsearch) {
+ assertTrue(getService(runner).getQuery().contains("\"five\""));
+ }
+ runner.clearTransferState();
}
- runner.clearTransferState();
}
@Override
@@ -120,21 +139,20 @@ public class SearchElasticsearchTest extends
AbstractPaginatedJsonQueryElasticse
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
"10");
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
String.valueOf(iteration));
- assertState(runner, paginationType, expectedHitCount, iteration);
+ assertState(runner, paginationType, expectedHitCount, iteration,
false);
} else if (perHitResultOutputStrategy && (iteration == 1 || iteration
== 2)) {
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 10, 0, 0);
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(hit
-> {
hit.assertAttributeEquals("hit.count", "1");
hit.assertAttributeEquals("page.number",
String.valueOf(iteration));
});
- assertState(runner, paginationType, expectedHitCount, iteration);
+ assertState(runner, paginationType, expectedHitCount, iteration,
false);
} else if ((perResponseResultOutputStrategy ||
perHitResultOutputStrategy) && iteration == 3) {
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 0, 0, 0);
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
assertEquals("five",
runner.getStateManager().getState(getStateScope()).get(ConsumeElasticsearch.STATE_RANGE_VALUE));
- } else {
-
assertTrue(runner.getStateManager().getState(getStateScope()).toMap().isEmpty());
}
+ assertState(runner, paginationType, 20, 3, true);
} else if
(ResultOutputStrategy.PER_QUERY.equals(resultOutputStrategy)) {
AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
"20");
@@ -143,13 +161,52 @@ public class SearchElasticsearchTest extends
AbstractPaginatedJsonQueryElasticse
assertEquals(20,
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().getContent().split("\n").length);
if (runner.getProcessor() instanceof ConsumeElasticsearch) {
assertEquals("five",
runner.getStateManager().getState(getStateScope()).get(ConsumeElasticsearch.STATE_RANGE_VALUE));
- } else {
-
assertTrue(runner.getStateManager().getState(getStateScope()).toMap().isEmpty());
}
+ assertState(runner, paginationType, 20, 3, true);
}
}
- private void assertState(final TestRunner runner, final PaginationType
paginationType, final int hitCount, final int pageCount) throws IOException {
+ @ParameterizedTest
+ @EnumSource(PaginationType.class)
+ void testPaginationWithoutRestartOnFinish(final PaginationType
paginationType) throws Exception {
+ final TestRunner runner = createRunner(false);
+ final TestElasticsearchClientService service =
AbstractJsonQueryElasticsearchTest.getService(runner);
+ service.setMaxPages(2);
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE,
paginationType);
+
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
ResultOutputStrategy.PER_RESPONSE);
+ runner.setProperty(SearchElasticsearch.RESTART_ON_FINISH, "false");
+ setQuery(runner, matchAllWithSortByMsgWithSizeQuery);
+
+ runOnce(runner);
+
+ AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getLast().assertAttributeEquals("hit.count",
"10");
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getLast().assertAttributeEquals("page.number",
"1");
+ assertState(runner, paginationType, 10, 1, false);
+ assertFalse(runner.isYieldCalled());
+
+ runOnce(runner);
+
+ AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 2, 0, 0);
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getLast().assertAttributeEquals("hit.count",
"10");
+
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getLast().assertAttributeEquals("page.number",
"2");
+ assertState(runner, paginationType, 20, 2, false);
+ assertFalse(runner.isYieldCalled());
+
+ runOnce(runner);
+
+ AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 2, 0, 0);
+ assertState(runner, paginationType, 20, 3, true);
+ assertFalse(runner.isYieldCalled());
+
+ runOnce(runner);
+
+ AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 2, 0, 0);
+ assertState(runner, paginationType, 20, 3, true);
+ assertTrue(runner.isYieldCalled());
+ }
+
+ private void assertState(final TestRunner runner, final PaginationType
paginationType, final int hitCount, final int pageCount, final boolean
finished) throws IOException {
final MockStateManager stateManager = runner.getStateManager();
stateManager.assertStateEquals(SearchElasticsearch.STATE_HIT_COUNT,
Integer.toString(hitCount), getStateScope());
@@ -160,27 +217,39 @@ public class SearchElasticsearchTest extends
AbstractPaginatedJsonQueryElasticse
stateManager.assertStateNotSet(ConsumeElasticsearch.STATE_RANGE_VALUE,
getStateScope());
}
- final String pageExpirationTimestamp =
stateManager.getState(getStateScope()).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP);
- assertTrue(Long.parseLong(pageExpirationTimestamp) >
Instant.now().toEpochMilli());
-
- switch (paginationType) {
- case SCROLL:
-
stateManager.assertStateEquals(SearchElasticsearch.STATE_SCROLL_ID, "scrollId-"
+ pageCount, getStateScope());
-
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID,
getStateScope());
-
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER,
getStateScope());
- break;
- case POINT_IN_TIME:
-
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID,
getStateScope());
-
stateManager.assertStateEquals(SearchElasticsearch.STATE_PIT_ID, "pitId-" +
pageCount, getStateScope());
-
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER,
"[\"searchAfter-" + pageCount + "\"]", getStateScope());
- break;
- case SEARCH_AFTER:
-
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID,
getStateScope());
-
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID,
getStateScope());
-
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER,
"[\"searchAfter-" + pageCount + "\"]", getStateScope());
- break;
- default:
- fail("Unknown paginationType: " + paginationType);
+ stateManager.assertStateEquals(ConsumeElasticsearch.STATE_FINISHED,
Boolean.toString(finished), getStateScope());
+ if (finished) {
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID,
getStateScope());
+ stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID,
getStateScope());
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER,
getStateScope());
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP,
getStateScope());
+ } else {
+ if (paginationType == PaginationType.SEARCH_AFTER) {
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP,
getStateScope());
+ } else {
+ final String pageExpirationTimestamp =
stateManager.getState(getStateScope()).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP);
+ assertTrue(Long.parseLong(pageExpirationTimestamp) >
Instant.now().toEpochMilli());
+ }
+
+ switch (paginationType) {
+ case SCROLL:
+
stateManager.assertStateEquals(SearchElasticsearch.STATE_SCROLL_ID, "scrollId-"
+ pageCount, getStateScope());
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID,
getStateScope());
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER,
getStateScope());
+ break;
+ case POINT_IN_TIME:
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID,
getStateScope());
+
stateManager.assertStateEquals(SearchElasticsearch.STATE_PIT_ID, "pitId-" +
pageCount, getStateScope());
+
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER,
"[\"searchAfter-" + pageCount + "\"]", getStateScope());
+ break;
+ case SEARCH_AFTER:
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID,
getStateScope());
+
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID,
getStateScope());
+
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER,
"[\"searchAfter-" + pageCount + "\"]", getStateScope());
+ break;
+ default:
+ fail("Unknown paginationType: " + paginationType);
+ }
}
}
}