Repository: incubator-unomi Updated Branches: refs/heads/feature-UNOMI-28-ES2X 62d11ded0 -> 661daeea5
UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates - Track index creation through internal memory structure Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/661daeea Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/661daeea Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/661daeea Branch: refs/heads/feature-UNOMI-28-ES2X Commit: 661daeea5951fe7219508e710609a965a3192989 Parents: 62d11de Author: Serge Huber <[email protected]> Authored: Mon Nov 21 17:39:59 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Mon Nov 21 17:39:59 2016 +0100 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 60 +++++++++++++++----- 1 file changed, 45 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/661daeea/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index b954b75..733b3a9 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -411,14 +411,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, bulkProcessor = getBulkProcessor(); } - try { - IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get(); - existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet()); - } catch (InterruptedException e) { - logger.error("Error retrieving indices stats", e); - } catch (ExecutionException e) { - logger.error("Error retrieving indices stats", e); - } + refreshExistingIndexNames(); logger.info("Waiting for index creation to complete..."); @@ -457,7 +450,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, int thisMonth = gc.get(Calendar.MONTH); gc.add(Calendar.DAY_OF_MONTH, 1); if (gc.get(Calendar.MONTH) != thisMonth) { - getMonthlyIndex(gc.getTime(), true); + String monthlyIndex = getMonthlyIndex(gc.getTime(), true); + existingIndexNames.add(monthlyIndex); } } }, 10000L, 24L * 60L * 60L * 1000L); @@ -465,6 +459,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.info(this.getClass().getName() + " service started successfully."); } + private void refreshExistingIndexNames() { + new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + try { + logger.info("Refreshing existing indices list..."); + IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get(); + existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet()); + } catch (InterruptedException e) { + logger.error("Error retrieving indices stats", e); + } catch (ExecutionException e) { + logger.error("Error retrieving indices stats", e); + } + return true; + } + }.executeInClassLoader(); + } + public BulkProcessor getBulkProcessor() { if (bulkProcessor != null) { return bulkProcessor; @@ -490,7 +501,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, BulkRequest request, Throwable failure) { logger.error("After Bulk (failure)", failure); - // we could add index creation here in the case of index seperation by dates. } }); if (bulkProcessorName != null && bulkProcessorName.length() > 0) { @@ -554,7 +564,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Object execute(Object... args) { logger.info("Closing ElasticSearch persistence backend..."); if (bulkProcessor != null) { - bulkProcessor.close(); + try { + bulkProcessor.awaitClose(2, TimeUnit.MINUTES); + } catch (InterruptedException e) { + logger.error("Error waiting for bulk operations to flush !", e); + } } node.close(); return null; @@ -730,17 +744,27 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (routingByType.containsKey(itemType)) { indexBuilder = indexBuilder.setRouting(routingByType.get(itemType)); } - try { - indexBuilder.execute().actionGet(); - } catch (IndexNotFoundException e) { + + if (!existingIndexNames.contains(index)) { + // index probably doesn't exist, unless something else has already created it. if (itemsMonthlyIndexed.contains(itemType)) { Date timeStamp = ((TimestampedItem) item).getTimeStamp(); if (timeStamp != null) { getMonthlyIndex(timeStamp, true); - indexBuilder.execute().actionGet(); } else { logger.warn("Missing time stamp on item " + item + " id=" + item.getItemId() + " can't create related monthly index !"); } + } else { + // this is not a timestamped index, should we create it anyway ? + createIndex(index); + } + } + + try { + indexBuilder.execute().actionGet(); + } catch (IndexNotFoundException e) { + if (existingIndexNames.contains(index)) { + existingIndexNames.remove(index); } } return true; @@ -887,6 +911,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, boolean indexExists = indicesExistsResponse.isExists(); if (indexExists) { client.admin().indices().prepareDelete(indexName).execute().actionGet(); + existingIndexNames.remove(indexName); } return indexExists; } @@ -919,6 +944,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } builder.execute().actionGet(); + existingIndexNames.add(indexName); + } @@ -1417,6 +1444,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public void refresh() { new InClassLoaderExecute<Boolean>() { protected Boolean execute(Object... args) { + if (bulkProcessor != null) { + bulkProcessor.flush(); + } client.admin().indices().refresh(Requests.refreshRequest()).actionGet(); return true; }
