UNOMI-70 Upgrade to ElasticSearch 5.x - Use scroll API to get unlimited results
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/a94fdb0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/a94fdb0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/a94fdb0e Branch: refs/heads/master Commit: a94fdb0e02b3606c012fa5f3d6d35212a34b489d Parents: dafa9f6 Author: Serge Huber <[email protected]> Authored: Thu Dec 8 09:14:14 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Thu Dec 8 09:14:14 2016 +0100 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 47 ++++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a94fdb0e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 0dc3899..fa4b5e1 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -898,6 +898,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, break; } } + client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet(); // we're done with the scrolling, delete now if (deleteByScope.numberOfActions() > 0) { @@ -1188,6 +1189,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, long totalHits = 0; try { String itemType = getItemType(clazz); + final TimeValue keepAlive = TimeValue.timeValueHours(1); SearchRequestBuilder requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) @@ -1199,7 +1201,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } else if (size != -1) { requestBuilder.setSize(size); } else { - // requestBuilder.setSize(Integer.MAX_VALUE); + // size == -1, use scroll query to retrieve all the results + requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) + .setTypes(itemType) + .setFetchSource(true) + .setScroll(keepAlive) + .setFrom(offset) + .setQuery(query) + .setSize(100); } if (routing != null) { requestBuilder.setRouting(routing); @@ -1229,13 +1238,35 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SearchResponse response = requestBuilder .execute() .actionGet(); - SearchHits searchHits = response.getHits(); - totalHits = searchHits.getTotalHits(); - for (SearchHit searchHit : searchHits) { - String sourceAsString = searchHit.getSourceAsString(); - final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - value.setItemId(searchHit.getId()); - results.add(value); + if (size == -1) { + // Scroll until no more hits are returned + while (true) { + + for (SearchHit searchHit : response.getHits().getHits()) { + // add hit to results + String sourceAsString = searchHit.getSourceAsString(); + final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); + value.setItemId(searchHit.getId()); + results.add(value); + } + + response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); + + // If we have no more hits, exit + if (response.getHits().getHits().length == 0) { + break; + } + } + client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet(); + } else { + SearchHits searchHits = response.getHits(); + totalHits = searchHits.getTotalHits(); + for (SearchHit searchHit : searchHits) { + String sourceAsString = searchHit.getSourceAsString(); + final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); + value.setItemId(searchHit.getId()); + results.add(value); + } } } catch (Exception t) { logger.error("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);
