Repository: incubator-unomi Updated Branches: refs/heads/master 8de0f782b -> 79ba27236
UNOMI-130 Use index templates to create new indexes with default mappings instead of Java code - Replace manual index creation for monthly indexes Signed-off-by: Serge Huber <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/79ba2723 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/79ba2723 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/79ba2723 Branch: refs/heads/master Commit: 79ba27236e3168798ffa234c07d4d7c6beeceb5a Parents: 8de0f78 Author: Serge Huber <[email protected]> Authored: Tue Oct 17 17:08:53 2017 +0200 Committer: Serge Huber <[email protected]> Committed: Tue Oct 17 17:08:53 2017 +0200 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 209 +++++++++---------- 1 file changed, 95 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/79ba2723/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 4b2df2e..1461c33 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -34,6 +34,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -51,6 +56,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -128,12 +134,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Map<String, String> indexNames; private List<String> itemsMonthlyIndexed; private Map<String, String> routingByType; - private Set<String> existingIndexNames = new TreeSet<String>(); private Integer defaultQueryLimit = 10; - private Timer timer; - private String bulkProcessorConcurrentRequests = "1"; private String bulkProcessorBulkActions = "1000"; private String bulkProcessorBulkSize = "5MB"; @@ -242,8 +245,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public void start() throws Exception { - loadPredefinedMappings(bundleContext, false); - // on startup new InClassLoaderExecute<Object>() { public Object execute(Object... args) throws Exception { @@ -298,6 +299,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, throw new Exception("Error checking ElasticSearch versions", e); } + loadPredefinedMappings(bundleContext, false); + + // load predefined mappings and condition dispatchers of any bundles that were started before this one. + for (Bundle existingBundle : bundleContext.getBundles()) { + if (existingBundle.getBundleContext() != null) { + loadPredefinedMappings(existingBundle.getBundleContext(), false); + } + } + // @todo is there a better way to detect index existence than to wait for it to startup ? boolean indexExists = false; int tries = 0; @@ -331,22 +341,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - client.admin().indices().preparePutTemplate(indexName + "_monthlyindex") - .setTemplate(indexName + "-*") - .setOrder(1) - .setSettings(Settings.builder() - .put(NUMBER_OF_SHARDS, Integer.parseInt(monthlyIndexNumberOfShards)) - .put(NUMBER_OF_REPLICAS, Integer.parseInt(monthlyIndexNumberOfReplicas)) - .build()).execute().actionGet(); - - getMonthlyIndex(new Date(), true); + createMonthlyIndexTemplate(); if (client != null && bulkProcessor == null) { bulkProcessor = getBulkProcessor(); } - refreshExistingIndexNames(); - logger.info("Waiting for GREEN cluster status..."); client.admin().cluster().prepareHealth() @@ -359,51 +359,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } }.executeInClassLoader(); - bundleContext.addBundleListener(this); - timer = new Timer(); - - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - GregorianCalendar gc = new GregorianCalendar(); - int thisMonth = gc.get(Calendar.MONTH); - gc.add(Calendar.DAY_OF_MONTH, 1); - if (gc.get(Calendar.MONTH) != thisMonth) { - String monthlyIndex = getMonthlyIndex(gc.getTime(), true); - existingIndexNames.add(monthlyIndex); - } - } - }, 10000L, 24L * 60L * 60L * 1000L); - - // load predefined mappings and condition dispatchers of any bundles that were started before this one. - for (Bundle existingBundle : bundleContext.getBundles()) { - if (existingBundle.getBundleContext() != null) { - loadPredefinedMappings(existingBundle.getBundleContext(), true); - } - } - logger.info(this.getClass().getName() + " service started successfully."); } - private void refreshExistingIndexNames() { - new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) throws Exception { - try { - logger.info("Refreshing existing indices list..."); - IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get(); - existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet()); - } catch (InterruptedException e) { - throw new Exception("Error retrieving indices stats", e); - } catch (ExecutionException e) { - throw new Exception("Error retrieving indices stats", e); - } - return true; - } - }.catchingExecuteInClassLoader(true); - } - public BulkProcessor getBulkProcessor() { if (bulkProcessor != null) { return bulkProcessor; @@ -502,11 +462,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } }.catchingExecuteInClassLoader(true); - if (timer != null) { - timer.cancel(); - timer = null; - } - bundleContext.removeBundleListener(this); } @@ -543,32 +498,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - private String getMonthlyIndex(Date date) { - return getMonthlyIndex(date, false); - } - - private String getMonthlyIndex(Date date, boolean checkAndCreate) { + private String getMonthlyIndexName(Date date) { String d = new SimpleDateFormat("-YYYY-MM").format(date); String monthlyIndexName = indexName + d; - - if (checkAndCreate) { - IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(monthlyIndexName).execute().actionGet(); - boolean indexExists = indicesExistsResponse.isExists(); - if (!indexExists) { - logger.info("{} index doesn't exist yet, creating it...", monthlyIndexName); - - Map<String, String> indexMappings = new HashMap<String, String>(); - indexMappings.put("_default_", mappings.get("_default_")); - for (Map.Entry<String, String> entry : mappings.entrySet()) { - if (itemsMonthlyIndexed.contains(entry.getKey())) { - indexMappings.put(entry.getKey(), entry.getValue()); - } - } - - internalCreateIndex(monthlyIndexName, indexMappings); - logger.info("{} index created.", monthlyIndexName); - } - } return monthlyIndexName; } @@ -637,7 +569,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return null; } else { String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(dateHint) : indexName); + (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(dateHint) : indexName); GetResponse response = client.prepareGet(index, itemType, itemId) .execute() @@ -653,7 +585,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } catch (IndexNotFoundException e) { - throw new Exception("No index found for itemType=" + clazz.getName() + " itemId=" + itemId, e); + // this can happen if we are just testing the existence of the item, it is not always an error. + return null; } catch (Exception ex) { throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, ex); } @@ -675,28 +608,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item); String itemType = item.getItemType(); String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(((TimestampedItem) item).getTimeStamp()) : indexName); + (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(((TimestampedItem) item).getTimeStamp()) : indexName); IndexRequestBuilder indexBuilder = client.prepareIndex(index, itemType, item.getItemId()) .setSource(source); if (routingByType.containsKey(itemType)) { indexBuilder = indexBuilder.setRouting(routingByType.get(itemType)); } - if (!existingIndexNames.contains(index)) { - // index probably doesn't exist, unless something else has already created it. - if (itemsMonthlyIndexed.contains(itemType)) { - Date timeStamp = ((TimestampedItem) item).getTimeStamp(); - if (timeStamp != null) { - getMonthlyIndex(timeStamp, true); - } else { - logger.warn("Missing time stamp on item " + item + " id=" + item.getItemId() + " can't create related monthly index !"); - } - } else { - // this is not a timestamped index, should we create it anyway ? - createIndex(index); - } - } - try { if (bulkProcessor == null || !useBatching) { indexBuilder.execute().actionGet(); @@ -704,9 +622,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, bulkProcessor.add(indexBuilder.request()); } } catch (IndexNotFoundException e) { - if (existingIndexNames.contains(index)) { - existingIndexNames.remove(index); - } } return true; } catch (IOException e) { @@ -734,7 +649,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String itemType = (String) clazz.getField("ITEM_TYPE").get(null); String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); + (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName); if (bulkProcessor == null) { client.prepareUpdate(index, itemType, itemId).setDoc(source) @@ -769,7 +684,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String itemType = (String) clazz.getField("ITEM_TYPE").get(null); String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); + (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName); for (int i = 0; i < scripts.length; i++) { Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]); @@ -828,7 +743,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String itemType = (String) clazz.getField("ITEM_TYPE").get(null); String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); + (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName); Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams); @@ -934,6 +849,74 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + + public boolean indexTemplateExists(final String templateName) { + Boolean result = new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + GetIndexTemplatesResponse getIndexTemplatesResponse = client.admin().indices().prepareGetTemplates(templateName).execute().actionGet(); + return getIndexTemplatesResponse.getIndexTemplates().size() == 1; + } + }.catchingExecuteInClassLoader(true); + if (result == null) { + return false; + } else { + return result; + } + } + + public boolean removeIndexTemplate(final String templateName) { + Boolean result = new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + DeleteIndexTemplateResponse deleteIndexTemplateResponse = client.admin().indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName)).actionGet(); + return deleteIndexTemplateResponse.isAcknowledged(); + } + }.catchingExecuteInClassLoader(true); + if (result == null) { + return false; + } else { + return result; + } + } + + public boolean createMonthlyIndexTemplate() { + Boolean result = new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("context-monthly-indices") + .template(indexName + "-*") + .settings("{\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" + + " \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + "\n" + + " },\n" + + " \"analysis\": {\n" + + " \"analyzer\": {\n" + + " \"folding\": {\n" + + " \"type\":\"custom\",\n" + + " \"tokenizer\": \"keyword\",\n" + + " \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n", XContentType.JSON); + Map<String, String> indexMappings = new HashMap<String, String>(); + indexMappings.put("_default_", mappings.get("_default_")); + for (Map.Entry<String, String> entry : mappings.entrySet()) { + if (itemsMonthlyIndexed.contains(entry.getKey())) { + indexMappings.put(entry.getKey(), entry.getValue()); + } + } + putIndexTemplateRequest.mappings().putAll(indexMappings); + PutIndexTemplateResponse putIndexTemplateResponse = client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet(); + return putIndexTemplateResponse.isAcknowledged(); + } + }.catchingExecuteInClassLoader(true); + if (result == null) { + return false; + } else { + return result; + } + } + public boolean createIndex(final String indexName) { Boolean result = new InClassLoaderExecute<Boolean>() { protected Boolean execute(Object... args) { @@ -966,7 +949,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, boolean indexExists = indicesExistsResponse.isExists(); if (indexExists) { client.admin().indices().prepareDelete(indexName).execute().actionGet(); - existingIndexNames.remove(indexName); } return indexExists; } @@ -994,14 +976,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, " }\n" + " }\n" + " }\n" + - "}\n"); + "}\n", XContentType.JSON); for (Map.Entry<String, String> entry : mappings.entrySet()) { - builder.addMapping(entry.getKey(), entry.getValue()); + builder.addMapping(entry.getKey(), entry.getValue(), XContentType.JSON); } builder.execute().actionGet(); - existingIndexNames.add(indexName); } @@ -1010,7 +991,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, client.admin().indices() .preparePutMapping(indexName) .setType(type) - .setSource(source) + .setSource(source, XContentType.JSON) .execute().actionGet(); }
