This is an automated email from the ASF dual-hosted git repository.

chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e0161f283f [NIFI-10304] [NIFI-14351] Added a config item to 
SearchElasticsearch processor to control if the processor restarts on query 
finish. Also removed query expiration for SEARCH_AFTER pagination type
e0161f283f is described below

commit e0161f283fc4a6a5c07cee7119288a983a052506
Author: Vijay Gorla <[email protected]>
AuthorDate: Sun Apr 6 10:01:54 2025 +1000

    [NIFI-10304] [NIFI-14351] Added a config item to SearchElasticsearch 
processor to control if the processor restarts on query finish. Also removed 
query expiration for SEARCH_AFTER pagination type
    
    Signed-off-by: Chris Sampson <[email protected]>
    
    This closes #9806.
---
 .../AbstractJsonQueryElasticsearch.java            |   4 +
 .../AbstractPaginatedJsonQueryElasticsearch.java   |  26 +--
 .../elasticsearch/ConsumeElasticsearch.java        |   2 +-
 .../PaginatedJsonQueryElasticsearch.java           |   6 +-
 .../elasticsearch/SearchElasticsearch.java         |  69 ++++++--
 .../api/PaginatedJsonQueryParameters.java          |  11 +-
 .../elasticsearch/api/PaginationType.java          |  14 +-
 ...bstractPaginatedJsonQueryElasticsearchTest.java |  28 ++-
 .../elasticsearch/ConsumeElasticsearchTest.java    |   2 +
 .../elasticsearch/SearchElasticsearchTest.java     | 189 ++++++++++++++-------
 10 files changed, 248 insertions(+), 103 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index bd7243b038..65f2f6876e 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -205,6 +205,10 @@ public abstract class AbstractJsonQueryElasticsearch<Q 
extends JsonQueryParamete
         try {
             final Q queryJsonParameters = buildJsonQueryParameters(input, 
context, session);
 
+            if (queryJsonParameters == null) {
+                context.yield();
+                return;
+            }
             List<FlowFile> hitsFlowFiles = new ArrayList<>();
             final StopWatch stopWatch = new StopWatch(true);
             final SearchResponse response = doQuery(queryJsonParameters, 
hitsFlowFiles, session, context, input, stopWatch);
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index d479e77c42..ec881b65cc 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -69,6 +69,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch 
extends AbstractJs
                     "in between requests (this is not the time expected for 
all pages to be returned, but the maximum " +
                     "allowed time for requests between page retrievals).")
             .required(true)
+            .dependsOn(PAGINATION_TYPE, PaginationType.SCROLL, 
PaginationType.POINT_IN_TIME)
             .defaultValue("10 mins")
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, 24, TimeUnit.HOURS))
@@ -100,9 +101,9 @@ public abstract class 
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
                            final StopWatch stopWatch) throws IOException {
         SearchResponse response = null;
         do {
-            // check any previously started query hasn't expired
-            final boolean expiredQuery = 
isExpired(paginatedJsonQueryParameters, context, response);
-            final boolean newQuery = 
StringUtils.isBlank(paginatedJsonQueryParameters.getPageExpirationTimestamp()) 
|| expiredQuery;
+            // reset query params if query needs to be restarted from the 
beginning
+            this.resetQueryParamsIfRequired(paginatedJsonQueryParameters, 
context);
+            final boolean newQuery = 
paginatedJsonQueryParameters.getPageCount() == 0;
 
             // execute query/scroll
             final String queryJson = updateQueryJson(newQuery, 
paginatedJsonQueryParameters, context, input);
@@ -155,13 +156,14 @@ public abstract class 
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
         final PaginatedJsonQueryParameters paginatedJsonQueryParameters = new 
PaginatedJsonQueryParameters();
         populateCommonJsonQueryParameters(paginatedJsonQueryParameters, input, 
context, session);
 
-        
paginatedJsonQueryParameters.setKeepAlive(context.getProperty(PAGINATION_KEEP_ALIVE).asTimePeriod(TimeUnit.SECONDS)
 + "s");
+        if (this.paginationType.hasExpiry()) {
+            
paginatedJsonQueryParameters.setKeepAlive(context.getProperty(PAGINATION_KEEP_ALIVE).asTimePeriod(TimeUnit.SECONDS)
 + "s");
+        }
 
         return paginatedJsonQueryParameters;
     }
 
-    abstract boolean isExpired(final PaginatedJsonQueryParameters 
paginatedQueryParameters, final ProcessContext context,
-                               final SearchResponse response) throws 
IOException;
+    abstract void resetQueryParamsIfRequired(final 
PaginatedJsonQueryParameters paginatedQueryParameters, final ProcessContext 
context) throws IOException;
 
     abstract String getScrollId(final ProcessContext context, final 
SearchResponse response) throws IOException;
 
@@ -283,12 +285,14 @@ public abstract class 
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
     void updateQueryParameters(final PaginatedJsonQueryParameters 
paginatedJsonQueryParameters, final SearchResponse response) {
         paginatedJsonQueryParameters.incrementPageCount();
+        paginatedJsonQueryParameters.setFinished(response.getHits().isEmpty());
 
-        // mark the paginated query for expiry if there are no hits (no more 
pages to obtain so stop looping on this query)
-        final String keepAliveDuration = "PT" + (!response.getHits().isEmpty() 
? paginatedJsonQueryParameters.getKeepAlive() : "0s");
-        paginatedJsonQueryParameters.setPageExpirationTimestamp(
-                
String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli())
-        );
+        if (this.paginationType.hasExpiry()) {
+            final String keepAliveDuration = "PT" + 
paginatedJsonQueryParameters.getKeepAlive();
+            paginatedJsonQueryParameters.setPageExpirationTimestamp(
+                    
String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli())
+            );
+        }
     }
 
     void clearElasticsearchState(final ProcessContext context, final 
SearchResponse response, final FlowFile input) {
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
index 7c6a86e099..0817c6c8c3 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
@@ -188,7 +188,7 @@ public class ConsumeElasticsearch extends 
SearchElasticsearch {
     private static final List<PropertyDescriptor> propertyDescriptors = 
Stream.concat(
             Stream.of(RANGE_FIELD, RANGE_FIELD_SORT_ORDER, 
RANGE_INITIAL_VALUE, RANGE_DATE_FORMAT, RANGE_TIME_ZONE, ADDITIONAL_FILTERS),
             scrollPropertyDescriptors.stream()
-                    .filter(pd -> !QUERY.equals(pd) && 
!QUERY_CLAUSE.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd))
+                    .filter(pd -> !QUERY.equals(pd) && 
!QUERY_CLAUSE.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd) && 
!RESTART_ON_FINISH.equals(pd))
                     .map(property -> {
                         if (property == ElasticsearchRestProcessor.SIZE) 
return SIZE;
                         if (property == 
ElasticsearchRestProcessor.AGGREGATIONS) return AGGREGATIONS;
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
index 787bacf966..2df02b5f11 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
@@ -82,10 +82,8 @@ public class PaginatedJsonQueryElasticsearch extends 
AbstractPaginatedJsonQueryE
     }
 
     @Override
-    boolean isExpired(final PaginatedJsonQueryParameters 
paginatedQueryJsonParameters, final ProcessContext context,
-                      final SearchResponse response) {
-        // queries using input FlowFiles don't expire, they run until 
completion
-        return false;
+    void resetQueryParamsIfRequired(final PaginatedJsonQueryParameters 
paginatedQueryJsonParameters, final ProcessContext context) {
+        // Noting to reset. Queries using input FlowFiles don't expire, they 
run until completion
     }
 
     @Override
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
index 7cda87f9f6..684b2d975e 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
@@ -30,6 +30,7 @@ import 
org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
@@ -39,6 +40,7 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
 import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
 import org.apache.nifi.processors.elasticsearch.api.PaginationType;
 import org.apache.nifi.util.StringUtils;
@@ -98,19 +100,31 @@ public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch
     static final String STATE_PAGE_EXPIRATION_TIMESTAMP = 
"pageExpirationTimestamp";
     static final String STATE_PAGE_COUNT = "pageCount";
     static final String STATE_HIT_COUNT = "hitCount";
+    static final String STATE_FINISHED = "finished";
 
     static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.QUERY)
             .description("A query in JSON syntax, not Lucene syntax. Ex: 
{\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}. " +
                     "If the query is empty, a default JSON Object will be 
used, which will result in a \"match_all\" query in Elasticsearch.")
             .build();
 
-    private static final Set<Relationship> relationships = Set.of(REL_HITS, 
REL_AGGREGATIONS);
+    static final PropertyDescriptor RESTART_ON_FINISH = new 
PropertyDescriptor.Builder()
+            .name("restart-on-finish")
+            .displayName("Restart On Finish?")
+            .description("Whether the processor should start another search 
with the same query once a paginated search has completed.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .defaultValue(Boolean.TRUE.toString())
+            .required(true)
+            .build();
+
+    private static final Set<Relationship> relationships = Set.of(REL_FAILURE, 
REL_RETRY, REL_HITS, REL_AGGREGATIONS);
 
     static final List<PropertyDescriptor> scrollPropertyDescriptors = 
Stream.concat(
             Stream.of(
                     // ensure QUERY_DEFINITION_STYLE first for consistency 
between Elasticsearch processors
                     QUERY_DEFINITION_STYLE,
-                    QUERY
+                    QUERY,
+                    RESTART_ON_FINISH
             ),
             paginatedPropertyDescriptors.stream().filter(
                     // override QUERY to change description (no FlowFile 
content used by SearchElasticsearch)
@@ -118,6 +132,8 @@ public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch
             )
     ).toList();
 
+    boolean restartOnFinish = true;
+
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
@@ -132,17 +148,34 @@ public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch
         return Scope.LOCAL;
     }
 
+    @Override
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        if (context.getProperty(RESTART_ON_FINISH).isSet()) {
+            this.restartOnFinish = 
context.getProperty(RESTART_ON_FINISH).asBoolean();
+        }
+    }
+
     @Override
     PaginatedJsonQueryParameters buildJsonQueryParameters(final FlowFile 
input, final ProcessContext context, final ProcessSession session) throws 
IOException {
+        final StateMap stateMap = 
context.getStateManager().getState(getStateScope());
+
+        final boolean finished = stateMap.get(STATE_FINISHED) != null && 
Boolean.parseBoolean(stateMap.get(STATE_FINISHED));
+
+        if (finished && !this.restartOnFinish) {
+            return null;
+        }
+
         final PaginatedJsonQueryParameters paginatedQueryJsonParameters = 
super.buildJsonQueryParameters(input, context, session);
 
-        final StateMap stateMap = 
context.getStateManager().getState(getStateScope());
         paginatedQueryJsonParameters.setHitCount(stateMap.get(STATE_HIT_COUNT) 
== null ? 0 : Integer.parseInt(stateMap.get(STATE_HIT_COUNT)));
         
paginatedQueryJsonParameters.setPageCount(stateMap.get(STATE_PAGE_COUNT) == 
null ? 0 : Integer.parseInt(stateMap.get(STATE_PAGE_COUNT)));
         
paginatedQueryJsonParameters.setScrollId(stateMap.get(STATE_SCROLL_ID));
         
paginatedQueryJsonParameters.setSearchAfter(stateMap.get(STATE_SEARCH_AFTER));
         paginatedQueryJsonParameters.setPitId(stateMap.get(STATE_PIT_ID));
         
paginatedQueryJsonParameters.setPageExpirationTimestamp(stateMap.get(STATE_PAGE_EXPIRATION_TIMESTAMP));
+        paginatedQueryJsonParameters.setFinished(finished);
 
         return paginatedQueryJsonParameters;
     }
@@ -153,11 +186,9 @@ public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch
         final Map<String, String> newStateMap = new HashMap<>(10, 1);
         additionalState(newStateMap, paginatedJsonQueryParameters);
 
-        if (response.getHits().isEmpty()) {
-            getLogger().debug("No more results for paginated query, resetting 
state for future queries");
-        } else {
-            getLogger().debug("Updating state for next execution");
+        getLogger().debug("Updating state for next execution");
 
+        if (!paginatedJsonQueryParameters.isFinished()) {
             if (paginationType == PaginationType.SCROLL) {
                 newStateMap.put(STATE_SCROLL_ID, response.getScrollId());
             } else {
@@ -167,10 +198,14 @@ public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch
                     newStateMap.put(STATE_PIT_ID, response.getPitId());
                 }
             }
-            newStateMap.put(STATE_HIT_COUNT, 
Integer.toString(paginatedJsonQueryParameters.getHitCount()));
-            newStateMap.put(STATE_PAGE_COUNT, 
Integer.toString(paginatedJsonQueryParameters.getPageCount()));
-            newStateMap.put(STATE_PAGE_EXPIRATION_TIMESTAMP, 
paginatedJsonQueryParameters.getPageExpirationTimestamp());
+            if (paginatedJsonQueryParameters.getPageExpirationTimestamp() != 
null) {
+                newStateMap.put(STATE_PAGE_EXPIRATION_TIMESTAMP, 
paginatedJsonQueryParameters.getPageExpirationTimestamp());
+            }
         }
+        newStateMap.put(STATE_HIT_COUNT, 
Integer.toString(paginatedJsonQueryParameters.getHitCount()));
+        newStateMap.put(STATE_PAGE_COUNT, 
Integer.toString(paginatedJsonQueryParameters.getPageCount()));
+        newStateMap.put(STATE_FINISHED, 
Boolean.toString(paginatedJsonQueryParameters.isFinished()));
+
         updateProcessorState(context, newStateMap);
     }
 
@@ -179,13 +214,13 @@ public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch
     }
 
     @Override
-    boolean isExpired(final PaginatedJsonQueryParameters 
paginatedJsonQueryParameters, final ProcessContext context,
-                      final SearchResponse response) throws IOException {
-        final boolean expiredQuery = 
StringUtils.isNotEmpty(paginatedJsonQueryParameters.getPageExpirationTimestamp())
+    void resetQueryParamsIfRequired(final PaginatedJsonQueryParameters 
paginatedJsonQueryParameters, final ProcessContext context) throws IOException {
+        final boolean expiredQuery = this.paginationType.hasExpiry() && 
StringUtils.isNotEmpty(paginatedJsonQueryParameters.getPageExpirationTimestamp())
                 && 
Instant.ofEpochMilli(Long.parseLong(paginatedJsonQueryParameters.getPageExpirationTimestamp())).isBefore(Instant.now());
+        final boolean restaredQuery = 
paginatedJsonQueryParameters.isFinished() && this.restartOnFinish;
 
-        if (expiredQuery) {
-            getLogger().debug("Existing paginated query has expired, resetting 
for new query");
+        if (expiredQuery || restaredQuery) {
+            getLogger().debug("Existing paginated query has expired or 
restarted, resetting for new query");
 
             final Map<String, String> newStateMap = new HashMap<>(1, 1);
             additionalState(newStateMap, paginatedJsonQueryParameters);
@@ -197,8 +232,8 @@ public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch
             paginatedJsonQueryParameters.setPitId(null);
             paginatedJsonQueryParameters.setScrollId(null);
             paginatedJsonQueryParameters.setSearchAfter(null);
+            paginatedJsonQueryParameters.setFinished(false);
         }
-        return expiredQuery;
     }
 
     @Override
@@ -224,4 +259,4 @@ public class SearchElasticsearch extends 
AbstractPaginatedJsonQueryElasticsearch
             context.getStateManager().setState(newStateMap, getStateScope());
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
index 68c34fae0f..4a268dde4b 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
@@ -25,6 +25,7 @@ public class PaginatedJsonQueryParameters extends 
JsonQueryParameters {
     private String pageExpirationTimestamp = null;
     private String keepAlive;
     private String trackingRangeValue;
+    private boolean finished = false;
 
     public int getPageCount() {
         return pageCount;
@@ -85,4 +86,12 @@ public class PaginatedJsonQueryParameters extends 
JsonQueryParameters {
     public void setTrackingRangeValue(String trackingRangeValue) {
         this.trackingRangeValue = trackingRangeValue;
     }
-}
+
+    public boolean isFinished() {
+        return finished;
+    }
+
+    public void setFinished(boolean finished) {
+        this.finished = finished;
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
index c8ea776309..47e84a5e20 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginationType.java
@@ -20,17 +20,19 @@ package org.apache.nifi.processors.elasticsearch.api;
 import org.apache.nifi.components.DescribedValue;
 
 public enum PaginationType implements DescribedValue {
-    SCROLL("pagination-scroll", "Use Elasticsearch \"_scroll\" API to page 
results. Does not accept additional query parameters."),
-    SEARCH_AFTER("pagination-search_after", "Use Elasticsearch 
\"search_after\" _search API to page sorted results."),
+    SCROLL("pagination-scroll", "Use Elasticsearch \"_scroll\" API to page 
results. Does not accept additional query parameters.", true),
+    SEARCH_AFTER("pagination-search_after", "Use Elasticsearch 
\"search_after\" _search API to page sorted results.", false),
     POINT_IN_TIME("pagination-pit", "Use Elasticsearch (7.10+ with XPack) 
\"point in time\" _search API to page sorted results. " +
-            "Not available for use with AWS OpenSearch.");
+            "Not available for use with AWS OpenSearch.", true);
 
     private final String value;
     private final String description;
+    private final boolean hasExpiry;
 
-    PaginationType(final String value, final String description) {
+    PaginationType(final String value, final String description, final boolean 
hasExpiry) {
         this.value = value;
         this.description = description;
+        this.hasExpiry = hasExpiry;
     }
 
     @Override
@@ -47,4 +49,8 @@ public enum PaginationType implements DescribedValue {
     public String getDescription() {
         return description;
     }
+
+    public boolean hasExpiry() {
+        return this.hasExpiry;
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java
index e47aa80465..4ba1409790 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java
@@ -54,18 +54,15 @@ public abstract class 
AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
     }
 
     @Test
-    void testInvalidPaginationProperties() {
+    void testInvalidPaginationTypeProperty() {
         final TestRunner runner = createRunner(false);
         runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
matchAllQuery);
-        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE,
 "not-a-period");
         
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
"not-enum");
 
         final AssertionError assertionError = 
assertThrows(AssertionError.class, runner::run);
         final String expected = String.format("""
-                        Processor has 2 validation failures:
+                        Processor has 1 validation failures:
                         '%s' validated against 'not-enum' is invalid because 
Given value not found in allowed set '%s'
-                        '%s' validated against 'not-a-period' is invalid 
because Must be of format <duration> <TimeUnit> where <duration> \
-                        is a non-negative integer and TimeUnit is a supported 
Time Unit, such as: nanos, millis, secs, mins, hrs, days
                         """,
                 
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(),
                 
Stream.of(PaginationType.values()).map(PaginationType::getValue).collect(Collectors.joining(",
 ")),
@@ -73,6 +70,27 @@ public abstract class 
AbstractPaginatedJsonQueryElasticsearchTest extends Abstra
         assertEquals(expected, assertionError.getMessage());
     }
 
+    @ParameterizedTest
+    @EnumSource(PaginationType.class)
+    void testInvalidPaginationKeepAliveProperty(final PaginationType 
paginationType) {
+        final TestRunner runner = createRunner(false);
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
matchAllQuery);
+        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE,
 "not-a-period");
+        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
paginationType.getValue());
+
+        if (paginationType.hasExpiry()) {
+            runner.assertNotValid();
+            final AssertionError assertionError = 
assertThrows(AssertionError.class, runner::run);
+            final String expected = String.format("Processor has 1 validation 
failures:\n" +
+                    "'%s' validated against 'not-a-period' is invalid because 
Must be of format <duration> <TimeUnit> where <duration> " +
+                    "is a non-negative integer and TimeUnit is a supported 
Time Unit, such as: nanos, millis, secs, mins, hrs, days\n",
+                    
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName());
+            assertEquals(expected, assertionError.getMessage());
+        } else {
+            runner.assertValid();
+        }
+    }
+
     @Test
     void testSinglePage() {
         // paged query hits (no splitting)
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearchTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearchTest.java
index 3f85803488..0a9b72fe2d 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearchTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearchTest.java
@@ -23,6 +23,7 @@ import com.jayway.jsonpath.JsonPath;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.elasticsearch.SearchResponse;
 import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.processors.elasticsearch.api.PaginationType;
 import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.util.TestRunner;
 import org.junit.jupiter.api.BeforeAll;
@@ -151,6 +152,7 @@ public class ConsumeElasticsearchTest extends 
SearchElasticsearchTest {
         paginatedJsonQueryParameters.setPageCount(pageCount);
         paginatedJsonQueryParameters.setTrackingRangeValue(defaultUnset);
 
+        ((ConsumeElasticsearch) runner.getProcessor()).paginationType = 
PaginationType.SCROLL;
         ((ConsumeElasticsearch) 
runner.getProcessor()).updateQueryParameters(paginatedJsonQueryParameters, 
response);
 
         if ("asc".equals(sortOrder)) {
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.java
index 4075057e19..6668e87b3b 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.java
@@ -67,47 +67,66 @@ public class SearchElasticsearchTest extends 
AbstractPaginatedJsonQueryElasticse
         AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
         
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
 "10");
         
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
 "1");
-        assertState(runner, paginationType, 10, 1);
+        assertState(runner, paginationType, 10, 1, false);
         if (runner.getProcessor() instanceof ConsumeElasticsearch) {
             assertFalse(getService(runner).getQuery().contains("\"five\""));
         }
 
-        // wait for expiration
-        final Instant expiration = 
Instant.ofEpochMilli(Long.parseLong(runner.getStateManager().getState(getStateScope()).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP)));
-        while (expiration.isAfter(Instant.now())) {
-            Thread.sleep(10);
-        }
+        if (paginationType == PaginationType.SEARCH_AFTER) {
+            Thread.sleep(2000); // Slightly longer than PAGINATION_KEEP_ALIVE 
of 1 sec
 
-        if ("true".equalsIgnoreCase(System.getenv("CI"))) {
-            // allow extra time if running in CI Pipeline to prevent 
intermittent timing-issue failures
-            Thread.sleep(1000);
-        }
+            runner.clearTransferState();
 
-        service.resetPageCount();
-        runner.clearTransferState();
+            // does not expire
+            runOnce(runner);
+            AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
+            
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
 "10");
+            
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
 "2");
+            assertState(runner, paginationType, 20, 2, false);
+            if (runner.getProcessor() instanceof ConsumeElasticsearch) {
+                // trackingRangeValue should be retained after previous query 
expiry
+                assertTrue(getService(runner).getQuery().contains("\"five\""));
+            }
+            runner.clearTransferState();
 
-        // first page again (new query after first query expired)
-        runOnce(runner);
-        AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
 "10");
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
 "1");
-        assertState(runner, paginationType, 10, 1);
-        if (runner.getProcessor() instanceof ConsumeElasticsearch) {
-            // trackingRangeValue should be retained after previous query 
expiry
-            assertTrue(getService(runner).getQuery().contains("\"five\""));
-        }
-        runner.clearTransferState();
+        } else {
+            // wait for expiration
+            final Instant expiration = 
Instant.ofEpochMilli(Long.parseLong(runner.getStateManager().getState(getStateScope()).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP)));
+            while (expiration.isAfter(Instant.now())) {
+                Thread.sleep(10);
+            }
 
-        // second page
-        runOnce(runner);
-        AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
 "10");
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
 "2");
-        assertState(runner, paginationType, 20, 2);
-        if (runner.getProcessor() instanceof ConsumeElasticsearch) {
-            assertTrue(getService(runner).getQuery().contains("\"five\""));
+            if ("true".equalsIgnoreCase(System.getenv("CI"))) {
+                // allow extra time if running in CI Pipeline to prevent 
intermittent timing-issue failures
+                Thread.sleep(1000);
+            }
+
+            service.resetPageCount();
+            runner.clearTransferState();
+
+            // first page again (new query after first query expired)
+            runOnce(runner);
+            AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
+            
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
 "10");
+            
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
 "1");
+            assertState(runner, paginationType, 10, 1, false);
+            if (runner.getProcessor() instanceof ConsumeElasticsearch) {
+                // trackingRangeValue should be retained after previous query 
expiry
+                assertTrue(getService(runner).getQuery().contains("\"five\""));
+            }
+            runner.clearTransferState();
+
+            // second page
+            runOnce(runner);
+            AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
+            
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
 "10");
+            
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
 "2");
+            assertState(runner, paginationType, 20, 2, false);
+            if (runner.getProcessor() instanceof ConsumeElasticsearch) {
+                assertTrue(getService(runner).getQuery().contains("\"five\""));
+            }
+            runner.clearTransferState();
         }
-        runner.clearTransferState();
     }
 
     @Override
@@ -120,21 +139,20 @@ public class SearchElasticsearchTest extends 
AbstractPaginatedJsonQueryElasticse
             AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
             
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
 "10");
             
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("page.number",
 String.valueOf(iteration));
-            assertState(runner, paginationType, expectedHitCount, iteration);
+            assertState(runner, paginationType, expectedHitCount, iteration, 
false);
         } else if (perHitResultOutputStrategy && (iteration == 1 || iteration 
== 2)) {
             AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 10, 0, 0);
             
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(hit
 -> {
                 hit.assertAttributeEquals("hit.count", "1");
                 hit.assertAttributeEquals("page.number", 
String.valueOf(iteration));
             });
-            assertState(runner, paginationType, expectedHitCount, iteration);
+            assertState(runner, paginationType, expectedHitCount, iteration, 
false);
         } else if ((perResponseResultOutputStrategy || 
perHitResultOutputStrategy) && iteration == 3) {
             AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 0, 0, 0);
             if (runner.getProcessor() instanceof ConsumeElasticsearch) {
                 assertEquals("five", 
runner.getStateManager().getState(getStateScope()).get(ConsumeElasticsearch.STATE_RANGE_VALUE));
-            } else {
-                
assertTrue(runner.getStateManager().getState(getStateScope()).toMap().isEmpty());
             }
+            assertState(runner, paginationType, 20, 3, true);
         } else if 
(ResultOutputStrategy.PER_QUERY.equals(resultOutputStrategy)) {
             AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
             
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().assertAttributeEquals("hit.count",
 "20");
@@ -143,13 +161,52 @@ public class SearchElasticsearchTest extends 
AbstractPaginatedJsonQueryElasticse
             assertEquals(20, 
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getFirst().getContent().split("\n").length);
             if (runner.getProcessor() instanceof ConsumeElasticsearch) {
                 assertEquals("five", 
runner.getStateManager().getState(getStateScope()).get(ConsumeElasticsearch.STATE_RANGE_VALUE));
-            } else {
-                
assertTrue(runner.getStateManager().getState(getStateScope()).toMap().isEmpty());
             }
+            assertState(runner, paginationType, 20, 3, true);
         }
     }
 
-    private void assertState(final TestRunner runner, final PaginationType 
paginationType, final int hitCount, final int pageCount) throws IOException {
+    @ParameterizedTest
+    @EnumSource(PaginationType.class)
+    void testPaginationWithoutRestartOnFinish(final PaginationType 
paginationType) throws Exception {
+        final TestRunner runner = createRunner(false);
+        final TestElasticsearchClientService service = 
AbstractJsonQueryElasticsearchTest.getService(runner);
+        service.setMaxPages(2);
+        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
paginationType);
+        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
 ResultOutputStrategy.PER_RESPONSE);
+        runner.setProperty(SearchElasticsearch.RESTART_ON_FINISH, "false");
+        setQuery(runner, matchAllWithSortByMsgWithSizeQuery);
+
+        runOnce(runner);
+
+        AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 1, 0, 0);
+        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getLast().assertAttributeEquals("hit.count",
 "10");
+        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getLast().assertAttributeEquals("page.number",
 "1");
+        assertState(runner, paginationType, 10, 1, false);
+        assertFalse(runner.isYieldCalled());
+
+        runOnce(runner);
+
+        AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 2, 0, 0);
+        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getLast().assertAttributeEquals("hit.count",
 "10");
+        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).getLast().assertAttributeEquals("page.number",
 "2");
+        assertState(runner, paginationType, 20, 2, false);
+        assertFalse(runner.isYieldCalled());
+
+        runOnce(runner);
+
+        AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 2, 0, 0);
+        assertState(runner, paginationType, 20, 3, true);
+        assertFalse(runner.isYieldCalled());
+
+        runOnce(runner);
+
+        AbstractJsonQueryElasticsearchTest.testCounts(runner, 0, 2, 0, 0);
+        assertState(runner, paginationType, 20, 3, true);
+        assertTrue(runner.isYieldCalled());
+    }
+
+    private void assertState(final TestRunner runner, final PaginationType 
paginationType, final int hitCount, final int pageCount, final boolean 
finished) throws IOException {
         final MockStateManager stateManager = runner.getStateManager();
 
         stateManager.assertStateEquals(SearchElasticsearch.STATE_HIT_COUNT, 
Integer.toString(hitCount), getStateScope());
@@ -160,27 +217,39 @@ public class SearchElasticsearchTest extends 
AbstractPaginatedJsonQueryElasticse
             
stateManager.assertStateNotSet(ConsumeElasticsearch.STATE_RANGE_VALUE, 
getStateScope());
         }
 
-        final String pageExpirationTimestamp = 
stateManager.getState(getStateScope()).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP);
-        assertTrue(Long.parseLong(pageExpirationTimestamp) > 
Instant.now().toEpochMilli());
-
-        switch (paginationType) {
-            case SCROLL:
-                
stateManager.assertStateEquals(SearchElasticsearch.STATE_SCROLL_ID, "scrollId-" 
+ pageCount, getStateScope());
-                
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, 
getStateScope());
-                
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER, 
getStateScope());
-                break;
-            case POINT_IN_TIME:
-                
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, 
getStateScope());
-                
stateManager.assertStateEquals(SearchElasticsearch.STATE_PIT_ID, "pitId-" + 
pageCount, getStateScope());
-                
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, 
"[\"searchAfter-" + pageCount + "\"]", getStateScope());
-                break;
-            case SEARCH_AFTER:
-                
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, 
getStateScope());
-                
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, 
getStateScope());
-                
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, 
"[\"searchAfter-" + pageCount + "\"]", getStateScope());
-                break;
-            default:
-                fail("Unknown paginationType: " + paginationType);
+        stateManager.assertStateEquals(ConsumeElasticsearch.STATE_FINISHED, 
Boolean.toString(finished), getStateScope());
+        if (finished) {
+            
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, 
getStateScope());
+            stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, 
getStateScope());
+            
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER, 
getStateScope());
+            
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP,
 getStateScope());
+        } else {
+            if (paginationType == PaginationType.SEARCH_AFTER) {
+                
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP,
 getStateScope());
+            } else {
+                final String pageExpirationTimestamp = 
stateManager.getState(getStateScope()).get(SearchElasticsearch.STATE_PAGE_EXPIRATION_TIMESTAMP);
+                assertTrue(Long.parseLong(pageExpirationTimestamp) > 
Instant.now().toEpochMilli());
+            }
+
+            switch (paginationType) {
+                case SCROLL:
+                    
stateManager.assertStateEquals(SearchElasticsearch.STATE_SCROLL_ID, "scrollId-" 
+ pageCount, getStateScope());
+                    
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, 
getStateScope());
+                    
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER, 
getStateScope());
+                    break;
+                case POINT_IN_TIME:
+                    
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, 
getStateScope());
+                    
stateManager.assertStateEquals(SearchElasticsearch.STATE_PIT_ID, "pitId-" + 
pageCount, getStateScope());
+                    
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, 
"[\"searchAfter-" + pageCount + "\"]", getStateScope());
+                    break;
+                case SEARCH_AFTER:
+                    
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, 
getStateScope());
+                    
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, 
getStateScope());
+                    
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, 
"[\"searchAfter-" + pageCount + "\"]", getStateScope());
+                    break;
+                default:
+                    fail("Unknown paginationType: " + paginationType);
+            }
         }
     }
 }


Reply via email to