UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates - Initial code to load existing indices - Add code to wait for index startup as there seems to be some problems with this in a cluster configuration.
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/fcea7057 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/fcea7057 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/fcea7057 Branch: refs/heads/master Commit: fcea70579e63dea428171754f8113d97dc76ede5 Parents: 9a58214 Author: Serge Huber <[email protected]> Authored: Thu Nov 17 11:07:54 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Thu Nov 17 11:07:54 2016 +0100 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/fcea7057/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 2208951..2915c32 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -103,6 +103,7 @@ import java.nio.file.Paths; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; @@ -163,6 +164,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Map<String,String> indexNames; private List<String> itemsMonthlyIndexed; private Map<String, String> routingByType; + private Set<String> existingIndexNames = new TreeSet<String>(); private String address; private String port; @@ -403,6 +405,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, bulkProcessor = getBulkProcessor(); } + try { + IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get(); + existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet()); + } catch (InterruptedException e) { + logger.error("Error retrieving indices stats", e); + } catch (ExecutionException e) { + logger.error("Error retrieving indices stats", e); + } + + logger.info("Waiting for index creation to complete..."); + + client.admin().cluster().prepareHealth() + .setWaitForGreenStatus() + .get(); + + logger.info("Cluster status is GREEN"); + return null; } }.executeInClassLoader();
