This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push: new f5eed2ccb UNOMI-737: indices reduction migration (ES document id suffixed to av… (#582) f5eed2ccb is described below commit f5eed2ccbe3b970ca91e1762e5a3e4722096fb9a Author: kevan Jahanshahi <jke...@apache.org> AuthorDate: Wed Mar 8 12:11:03 2023 +0100 UNOMI-737: indices reduction migration (ES document id suffixed to av… (#582) * UNOMI-737: indices reduction migration (ES document id suffixed to avoid conflicts) * UNOMI-737: add integration tests * UNOMI-737: add integration test * UNOMI-737: un-skip a test about purge --- .../services/impl/GroovyActionsServiceImpl.java | 25 +- .../test/java/org/apache/unomi/itests/BaseIT.java | 6 - .../unomi/itests/GroovyActionsServiceIT.java | 2 +- .../org/apache/unomi/itests/ProfileServiceIT.java | 1 - .../unomi/itests/migration/Migrate16xTo220IT.java | 45 +++- .../ElasticSearchPersistenceServiceImpl.java | 298 ++++++++++----------- .../META-INF/cxs/mappings/personaSession.json | 41 +++ .../unomi/persistence/spi/PersistenceService.java | 25 +- .../services/impl/profiles/ProfileServiceImpl.java | 46 +--- .../services/impl/rules/RulesServiceImpl.java | 22 +- .../migrate-2.2.0-05-indicesReduction.groovy | 58 ++-- 11 files changed, 264 insertions(+), 305 deletions(-) diff --git a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java index 5761cc1c5..7b215c187 100644 --- a/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java +++ b/extensions/groovy-actions/services/src/main/java/org/apache/unomi/groovy/actions/services/impl/GroovyActionsServiceImpl.java @@ -79,7 +79,6 @@ public class GroovyActionsServiceImpl implements GroovyActionsService { private static final Logger logger = LoggerFactory.getLogger(GroovyActionsServiceImpl.class.getName()); private static final String BASE_SCRIPT_NAME = "BaseScript"; - private static final String GROOVY_SOURCE_CODE_ID_SUFFIX = "-groovySourceCode"; private DefinitionsService definitionsService; private PersistenceService persistenceService; @@ -217,21 +216,20 @@ public class GroovyActionsServiceImpl implements GroovyActionsService { @Override public void remove(String id) { - String groovySourceCodeId = getGroovyCodeSourceIdForActionId(id); - if (groovyCodeSourceMap.containsKey(groovySourceCodeId)) { + if (groovyCodeSourceMap.containsKey(id)) { try { definitionsService.removeActionType( - groovyShell.parse(groovyCodeSourceMap.get(groovySourceCodeId)).getClass().getMethod("execute").getAnnotation(Action.class).id()); + groovyShell.parse(groovyCodeSourceMap.get(id)).getClass().getMethod("execute").getAnnotation(Action.class).id()); } catch (NoSuchMethodException e) { logger.error("Failed to delete the action type for the id {}", id, e); } - persistenceService.remove(groovySourceCodeId, GroovyAction.class); + persistenceService.remove(id, GroovyAction.class); } } @Override public GroovyCodeSource getGroovyCodeSource(String id) { - return groovyCodeSourceMap.get(getGroovyCodeSourceIdForActionId(id)); + return groovyCodeSourceMap.get(id); } /** @@ -245,21 +243,10 @@ public class GroovyActionsServiceImpl implements GroovyActionsService { return new GroovyCodeSource(groovyScript, actionName, "/groovy/script"); } - /** - * We use a suffix for avoiding id conflict between the actionType and the groovyAction in ElasticSearch - * Since those items are now stored in the same ES index - * @param actionName name/id of the actionType - * @return id of the groovyAction source code for query/save/storage usage. - */ - private String getGroovyCodeSourceIdForActionId(String actionName) { - return actionName + GROOVY_SOURCE_CODE_ID_SUFFIX; - } - private void saveScript(String actionName, String script) { - String groovyName = getGroovyCodeSourceIdForActionId(actionName); - GroovyAction groovyScript = new GroovyAction(groovyName, script); + GroovyAction groovyScript = new GroovyAction(actionName, script); persistenceService.save(groovyScript); - logger.info("The script {} has been persisted.", groovyName); + logger.info("The script {} has been persisted.", actionName); } private void refreshGroovyActions() { diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java index 1f5ebf1a4..7e4e20cfa 100644 --- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java @@ -214,12 +214,6 @@ public abstract class BaseIT extends KarafTestSupport { refreshPersistence(); } - protected void recreateIndex(final String itemType) { - if (persistenceService.removeIndex(itemType)) { - persistenceService.createIndex(itemType); - } - } - protected void refreshPersistence() throws InterruptedException { persistenceService.refresh(); Thread.sleep(1000); diff --git a/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java index 7fecea183..f86ff7f7e 100644 --- a/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/GroovyActionsServiceIT.java @@ -123,7 +123,7 @@ public class GroovyActionsServiceIT extends BaseIT { GroovyCodeSource groovyCodeSource = keepTrying("Failed waiting for the creation of the GroovyAction for the save test", () -> groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION), Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); - Assert.assertEquals(UPDATE_ADDRESS_ACTION + "-groovySourceCode", groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION).getName()); + Assert.assertEquals(UPDATE_ADDRESS_ACTION, groovyActionsService.getGroovyCodeSource(UPDATE_ADDRESS_ACTION).getName()); Assert.assertTrue(actionType.getMetadata().getId().contains(UPDATE_ADDRESS_GROOVY_ACTION)); Assert.assertEquals(2, actionType.getMetadata().getSystemTags().size()); diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java index 455ab9073..67cc5c16e 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java @@ -331,7 +331,6 @@ public class ProfileServiceIT extends BaseIT { } @Test - @Ignore // TODO - fix test https://issues.apache.org/jira/browse/UNOMI-726 public void testMonthlyIndicesPurge() throws Exception { Date currentDate = new Date(); LocalDateTime minus10Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(10); diff --git a/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java b/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java index 8ae93daf4..7dc321268 100644 --- a/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java +++ b/itests/src/test/java/org/apache/unomi/itests/migration/Migrate16xTo220IT.java @@ -40,7 +40,10 @@ public class Migrate16xTo220IT extends BaseIT { private int sessionCount = 0; private static final int NUMBER_DUPLICATE_SESSIONS = 3; - private static final int NUMBER_PERSONA_SESSIONS = 2; + private static final List<String> oldSystemItemsIndices = Arrays.asList("context-actiontype", "context-campaign", "context-campaignevent", "context-goal", + "context-userlist", "context-propertytype", "context-scope", "context-conditiontype", "context-rule", "context-scoring", "context-segment", "context-groovyaction", "context-topic", + "context-patch", "context-jsonschema", "context-importconfig", "context-exportconfig", "context-rulestats"); + @Override @Before public void waitForStartup() throws InterruptedException { @@ -56,8 +59,9 @@ public class Migrate16xTo220IT extends BaseIT { } // Restore the snapshot HttpUtils.executePostRequest(httpClient, "http://localhost:9400/_snapshot/snapshots_repository/snapshot_1.6.x/_restore?wait_for_completion=true", "{}", null); - fillNumberEventAndSessionBeforeMigration(httpClient); + // Get initial counts of items to compare after migration + initCounts(httpClient); } catch (IOException e) { throw new RuntimeException(e); } @@ -93,6 +97,7 @@ public class Migrate16xTo220IT extends BaseIT { checkEventTypesNotPersistedAnymore(); checkForMappingUpdates(); checkEventSessionRollover2_2_0(); + checkIndexReductions2_2_0(); } /** @@ -107,17 +112,25 @@ public class Migrate16xTo220IT extends BaseIT { int newEventcount = 0; for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-event-0")) { - JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null)); - newEventcount += jsonNode.get("count").asInt(); + newEventcount += countItems(httpClient, eventIndex, null); } int newSessioncount = 0; for (String sessionIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-0")) { - JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + sessionIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null)); - newSessioncount += jsonNode.get("count").asInt(); + newSessioncount += countItems(httpClient, sessionIndex, null); } Assert.assertEquals(eventCount, newEventcount); - Assert.assertEquals(sessionCount - NUMBER_DUPLICATE_SESSIONS + NUMBER_PERSONA_SESSIONS, newSessioncount); + Assert.assertEquals(sessionCount - NUMBER_DUPLICATE_SESSIONS, newSessioncount); + } + + private void checkIndexReductions2_2_0() throws IOException { + // new index for system items: + Assert.assertTrue(MigrationUtils.indexExists(httpClient, "http://localhost:9400", "context-systemitems")); + + // old indices should be removed: + for (String oldSystemItemsIndex : oldSystemItemsIndices) { + Assert.assertFalse(MigrationUtils.indexExists(httpClient, "http://localhost:9400", oldSystemItemsIndex)); + } } /** @@ -298,21 +311,25 @@ public class Migrate16xTo220IT extends BaseIT { Assert.assertNull(persistenceService.load(masterProfile, ProfileAlias.class)); } - private void fillNumberEventAndSessionBeforeMigration(CloseableHttpClient httpClient) { + private void initCounts(CloseableHttpClient httpClient) { try { for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-event-date")) { - JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/must_not_match_some_eventype_body.json"), null)); - eventCount += jsonNode.get("count").asInt(); + eventCount += countItems(httpClient, eventIndex, resourceAsString("migration/must_not_match_some_eventype_body.json")); } - for (String eventIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-date")) { - JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_count", resourceAsString("migration/match_all_body_request.json"), null)); - sessionCount += jsonNode.get("count").asInt(); + for (String sessionIndex : MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session-date")) { + sessionCount += countItems(httpClient, sessionIndex, null); } } catch (IOException e) { throw new RuntimeException(e); } + } - + private int countItems(CloseableHttpClient httpClient, String index, String requestBody) throws IOException { + if (requestBody == null) { + requestBody = resourceAsString("migration/must_not_match_some_eventype_body.json"); + } + JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + index + "/_count", requestBody, null)); + return jsonNode.get("count").asInt(); } } diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index d14d284d9..f0929887b 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -148,22 +148,9 @@ import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.elasticsearch.index.query.QueryBuilders.termQuery; @@ -249,31 +236,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>(); private static final Map<String, String> itemTypeIndexNameMap = new HashMap<>(); + private static final Collection<String> systemItems = Arrays.asList("actionType", "campaign", "campaignevent", "goal", + "userList", "propertyType", "scope", "conditionType", "rule", "scoring", "segment", "groovyAction", "topic", + "patch", "jsonSchema", "importConfig", "exportConfig", "rulestats"); static { - itemTypeIndexNameMap.put("actionType", "systemItems"); - itemTypeIndexNameMap.put("campaign", "systemItems"); - itemTypeIndexNameMap.put("campaignevent", "systemItems"); - itemTypeIndexNameMap.put("goal", "systemItems"); - itemTypeIndexNameMap.put("userList", "systemItems"); - itemTypeIndexNameMap.put("propertyType", "systemItems"); - itemTypeIndexNameMap.put("scope", "systemItems"); - itemTypeIndexNameMap.put("conditionType", "systemItems"); - itemTypeIndexNameMap.put("rule", "systemItems"); - itemTypeIndexNameMap.put("scoring", "systemItems"); - itemTypeIndexNameMap.put("segment", "systemItems"); - itemTypeIndexNameMap.put("groovyAction", "systemItems"); - itemTypeIndexNameMap.put("topic", "systemItems"); - itemTypeIndexNameMap.put("patch", "systemItems"); - itemTypeIndexNameMap.put("jsonSchema", "systemItems"); - itemTypeIndexNameMap.put("importConfig", "systemItems"); - itemTypeIndexNameMap.put("exportConfig", "systemItems"); - itemTypeIndexNameMap.put("rulestats", "systemItems"); + for (String systemItem : systemItems) { + itemTypeIndexNameMap.put(systemItem, "systemItems"); + } itemTypeIndexNameMap.put("profile", "profile"); itemTypeIndexNameMap.put("persona", "profile"); - - itemTypeIndexNameMap.put("session", "session"); - itemTypeIndexNameMap.put("personaSession", "session"); } public void setBundleContext(BundleContext bundleContext) { @@ -840,19 +812,20 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (customItemType != null) { itemType = customItemType; } + String documentId = getDocumentIDForItemType(itemId, itemType); - String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(itemId) ? sessionAffinityCache.get(itemId) : null; + String affinityIndex = "session".equals(itemType) && sessionAffinityCache.containsKey(documentId) ? sessionAffinityCache.get(documentId) : null; if (affinityIndex == null && isItemTypeRollingOver(itemType)) { return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") { @Override public T execute(Object... args) throws Exception { if (customItemType == null) { - PartialList<T> r = query(QueryBuilders.idsQuery().addIds(itemId), null, clazz, 0, 1, null, null); + PartialList<T> r = query(QueryBuilders.idsQuery().addIds(documentId), null, clazz, 0, 1, null, null); if (r.size() > 0) { return r.get(0); } } else { - PartialList<CustomItem> r = query(QueryBuilders.idsQuery().addIds(itemId), null, customItemType, 0, 1, null, null); + PartialList<CustomItem> r = query(QueryBuilders.idsQuery().addIds(documentId), null, customItemType, 0, 1, null, null); if (r.size() > 0) { return (T) r.get(0); } @@ -861,12 +834,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } }.execute(); } else { - GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), itemId); + GetRequest getRequest = new GetRequest(affinityIndex != null ? affinityIndex : getIndex(itemType), documentId); GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); if (response.isExists()) { String sourceAsString = response.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); + setMetadata(value, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); return value; } else { return null; @@ -889,15 +862,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } - private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm, String index) { - item.setItemId(id); + private void setMetadata(Item item, long version, long seqNo, long primaryTerm, String index) { item.setVersion(version); item.setSystemMetadata(SEQ_NO, seqNo); item.setSystemMetadata(PRIMARY_TERM, primaryTerm); item.setSystemMetadata("index", index); - if (item.getItemType().equals("session") && !sessionAffinityCache.containsKey(id)) { - sessionAffinityCache.put(id, index); - } } @Override @@ -925,17 +894,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item); String itemType = item.getItemType(); - String className = item.getClass().getName(); if (item instanceof CustomItem) { itemType = ((CustomItem) item).getCustomItemType(); - className = CustomItem.class.getName() + "." + itemType; } - String itemId = item.getItemId(); - String index = item.getSystemMetadata("index") != null ? - (String) item.getSystemMetadata("index") : - getIndex(itemType); + String documentId = getDocumentIDForItemType(item.getItemId(), itemType); + String index = item.getSystemMetadata("index") != null ? (String) item.getSystemMetadata("index") : getIndex(itemType); + IndexRequest indexRequest = new IndexRequest(index); - indexRequest.id(itemId); + indexRequest.id(documentId); indexRequest.source(source, XContentType.JSON); if (!alwaysOverwrite) { @@ -958,13 +924,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (bulkProcessor == null || !useBatching) { indexRequest.setRefreshPolicy(getRefreshPolicy(itemType)); IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT); - setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); + setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); } else { bulkProcessor.add(indexRequest); } } catch (IndexNotFoundException e) { - logger.error("Could not find index {}, could not register item type {} with id {} ", - index, itemType, itemId, e); + logger.error("Could not find index {}, could not register item type {} with id {} ", index, itemType, item.getItemId(), e); return false; } return true; @@ -1015,7 +980,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (bulkProcessor == null || !useBatchingForUpdate) { UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT); - setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); + setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); } else { bulkProcessor.add(updateRequest); } @@ -1034,7 +999,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private UpdateRequest createUpdateRequest(Class clazz, Item item, Map source, boolean alwaysOverwrite) { String itemType = Item.getItemType(clazz); - UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType), item.getItemId()); + String documentId = getDocumentIDForItemType(item.getItemId(), itemType); + String index = getIndex(itemType); + + UpdateRequest updateRequest = new UpdateRequest(index, documentId); updateRequest.doc(source); if (!alwaysOverwrite) { @@ -1115,7 +1083,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); - String index = getIndex(itemType); for (int i = 0; i < scripts.length; i++) { @@ -1213,12 +1180,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); - String index = getIndex(itemType); + String documentId = getDocumentIDForItemType(item.getItemId(), itemType); Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams); - UpdateRequest updateRequest = new UpdateRequest(index, item.getItemId()); + UpdateRequest updateRequest = new UpdateRequest(index, documentId); Long seqNo = (Long) item.getSystemMetadata(SEQ_NO); Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM); @@ -1230,7 +1197,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, updateRequest.script(actualScript); if (bulkProcessor == null) { UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT); - setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); + setMetadata(item, response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); } else { bulkProcessor.add(updateRequest); } @@ -1266,8 +1233,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (customItemType != null) { itemType = customItemType; } + String documentId = getDocumentIDForItemType(itemId, itemType); + String index = getIndexNameForQuery(itemType); - DeleteRequest deleteRequest = new DeleteRequest(getIndexNameForQuery(itemType), itemId); + DeleteRequest deleteRequest = new DeleteRequest(index, documentId); client.delete(deleteRequest, RequestOptions.DEFAULT); return true; } catch (Exception e) { @@ -1285,78 +1254,81 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws Exception { - try { - String itemType = Item.getItemType(clazz); - QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.getQueryBuilder(query); - final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType)) - .setQuery(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder) - // Setting slices to auto will let Elasticsearch choose the number of slices to use. - // This setting will use one slice per shard, up to a certain limit. - // The delete request will be more efficient and faster than no slicing. - .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) - // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request. - // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail. - // So we explicitly set the conflict strategy to proceed in case of version conflict. - .setAbortOnVersionConflict(false) - // Remove by Query is mostly used for purge and cleaning up old data - // It's mostly used in jobs/timed tasks so we don't really care about long request - // So we increase default timeout of 1min to 10min - .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes)); - - BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); - - if (bulkByScrollResponse == null) { - logger.error("Remove by query: no response returned for query: {}", query); - return false; - } + QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.getQueryBuilder(query); + return removeByQuery(queryBuilder, clazz); + } + }.catchingExecuteInClassLoader(true); + if (result == null) { + return false; + } else { + return result; + } + } - if (bulkByScrollResponse.isTimedOut()) { - logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query); - } + public <T extends Item> boolean removeByQuery(QueryBuilder queryBuilder, final Class<T> clazz) throws Exception { + try { + String itemType = Item.getItemType(clazz); + final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType)) + .setQuery(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder) + // Setting slices to auto will let Elasticsearch choose the number of slices to use. + // This setting will use one slice per shard, up to a certain limit. + // The delete request will be more efficient and faster than no slicing. + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) + // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request. + // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail. + // So we explicitly set the conflict strategy to proceed in case of version conflict. + .setAbortOnVersionConflict(false) + // Remove by Query is mostly used for purge and cleaning up old data + // It's mostly used in jobs/timed tasks so we don't really care about long request + // So we increase default timeout of 1min to 10min + .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes)); + + BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + + if (bulkByScrollResponse == null) { + logger.error("Remove by query: no response returned for query: {}", queryBuilder); + return false; + } - if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) || - bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { - logger.warn("Remove by query: we found some failure during the process of query: {}", query); + if (bulkByScrollResponse.isTimedOut()) { + logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, queryBuilder); + } - if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) { - for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) { - logger.warn("Remove by query, search failure: {}", searchFailure.toString()); - } - } + if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) || + bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { + logger.warn("Remove by query: we found some failure during the process of query: {}", queryBuilder); - if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { - for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) { - logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString()); - } - } + if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) { + for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) { + logger.warn("Remove by query, search failure: {}", searchFailure.toString()); } + } - if (logger.isDebugEnabled()) { - logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}", - bulkByScrollResponse.getTook().toHumanReadableString(1), - bulkByScrollResponse.getDeleted(), - bulkByScrollResponse.getBatches(), - bulkByScrollResponse.getNoops(), - bulkByScrollResponse.getVersionConflicts(), - bulkByScrollResponse.getSearchRetries(), - bulkByScrollResponse.getBulkRetries(), - query); + if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { + for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) { + logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString()); } - - return true; - } catch (Exception e) { - throw new Exception("Cannot remove by query", e); } } - }.catchingExecuteInClassLoader(true); - if (result == null) { - return false; - } else { - return result; + + if (logger.isDebugEnabled()) { + logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}", + bulkByScrollResponse.getTook().toHumanReadableString(1), + bulkByScrollResponse.getDeleted(), + bulkByScrollResponse.getBatches(), + bulkByScrollResponse.getNoops(), + bulkByScrollResponse.getVersionConflicts(), + bulkByScrollResponse.getSearchRetries(), + bulkByScrollResponse.getBulkRetries(), + queryBuilder); + } + + return true; + } catch (Exception e) { + throw new Exception("Cannot remove by query", e); } } - public boolean indexTemplateExists(final String templateName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws IOException { @@ -1444,8 +1416,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - public boolean removeIndex(final String itemType, boolean addPrefix){ - String index = addPrefix ? getIndex(itemType) : itemType; + public boolean removeIndex(final String itemType) { + String index = getIndex(itemType); Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws IOException { @@ -1465,9 +1437,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return result; } } - public boolean removeIndex(final String itemType) { - return removeIndex(itemType, true); - } private void internalCreateRolloverTemplate(String itemName) throws IOException { String rolloverAlias = indexPrefix + "-" + itemName; @@ -1830,9 +1799,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { final Class<? extends Item> clazz = item.getClass(); String itemType = Item.getItemType(clazz); + String documentId = getDocumentIDForItemType(item.getItemId(), itemType); QueryBuilder builder = QueryBuilders.boolQuery() - .must(QueryBuilders.idsQuery().addIds(item.getItemId())) + .must(QueryBuilders.idsQuery().addIds(documentId)) .must(conditionESQueryBuilderDispatcher.buildFilter(query)); return queryCount(builder, itemType) > 0; } finally { @@ -1918,29 +1888,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - @Override - public Map<String, Long> docCountPerIndex(String... indexes) { - return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".docCountPerIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { - @Override - protected Map<String, Long> execute(Object... args) throws IOException { - List<String> indexesForQuery = Stream.of(indexes).map(index -> getIndexNameForQuery(index)).collect(Collectors.toList()); - String[] itemsArray = new String[indexesForQuery.size()]; - itemsArray = indexesForQuery.toArray(itemsArray); - GetIndexRequest request = new GetIndexRequest(itemsArray); - GetIndexResponse getIndexResponse = client.indices().get(request, RequestOptions.DEFAULT); - - Map<String, Long> countPerIndex = new HashMap<>(); - - for (String index : getIndexResponse.getIndices()) { - CountRequest countRequest = new CountRequest(index); - CountResponse response = client.count(countRequest, RequestOptions.DEFAULT); - countPerIndex.put(index, response.getCount()); - } - return countPerIndex; - } - }.catchingExecuteInClassLoader(true); - } - private long queryCount(final QueryBuilder filter, final String itemType) { return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { @@ -2041,7 +1988,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // add hit to results String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); + setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); results.add(value); } @@ -2071,7 +2018,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (SearchHit searchHit : searchHits) { String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); + setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); results.add(value); } } @@ -2117,7 +2064,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // add hit to results String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); + setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); results.add(value); } } @@ -2158,7 +2105,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // add hit to results String sourceAsString = searchHit.getSourceAsString(); final CustomItem value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, CustomItem.class); - setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); + setMetadata(value, searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); results.add(value); } } @@ -2420,6 +2367,42 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public void purge(final Date date) { + // nothing, this method is deprecated since 2.2.0 + } + + @Override + public <T extends Item> void purgeTimeBasedItems(int existsNumberOfDays, Class<T> clazz) { + new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".purgeTimeBasedItems", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { + protected Boolean execute(Object... args) throws Exception { + String itemType = Item.getItemType(clazz); + + if (existsNumberOfDays > 0 && isItemTypeRollingOver(itemType)) { + // First we purge the documents + removeByQuery(QueryBuilders.rangeQuery("timeStamp").lte("now-" + existsNumberOfDays + "d"), clazz); + + // get count per index for those time based data + TreeMap<String, Long> countsPerIndex = new TreeMap<>(); + GetIndexResponse getIndexResponse = client.indices().get(new GetIndexRequest(getIndexNameForQuery(itemType)), RequestOptions.DEFAULT); + for (String index : getIndexResponse.getIndices()) { + countsPerIndex.put(index, client.count(new CountRequest(index), RequestOptions.DEFAULT).getCount()); + } + + // Check for count=0 and remove them + if (countsPerIndex.size() >= 1) { + // do not check the last index, because it's the one used to write documents + countsPerIndex.pollLastEntry(); + + for (Map.Entry<String, Long> indexCount : countsPerIndex.entrySet()) { + if (indexCount.getValue() == 0) { + client.indices().delete(new DeleteIndexRequest(indexCount.getKey()), RequestOptions.DEFAULT); + } + } + } + } + + return true; + } + }.catchingExecuteInClassLoader(true); } @Override @@ -2632,6 +2615,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return itemTypeIndexNameMap.getOrDefault(itemType, itemType); } + private String getDocumentIDForItemType(String itemId, String itemType) { + return systemItems.contains(itemType) ? (itemId + "_" + itemType.toLowerCase()) : itemId; + } + private QueryBuilder wrapWithItemTypeQuery(String itemType, QueryBuilder originalQuery) { BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery(); wrappedQuery.must(getItemTypeQueryBuilder(itemType)); @@ -2657,5 +2644,4 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return WriteRequest.RefreshPolicy.NONE; } - } diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json new file mode 100644 index 000000000..c635e0285 --- /dev/null +++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/personaSession.json @@ -0,0 +1,41 @@ +{ + "dynamic_templates": [ + { + "all": { + "match": "*", + "match_mapping_type": "string", + "mapping": { + "type": "text", + "analyzer": "folding", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + ], + "properties": { + "duration": { + "type": "long" + }, + "timeStamp": { + "type": "date" + }, + "lastEventDate": { + "type": "date" + }, + "properties": { + "properties": { + "location": { + "type": "geo_point" + } + } + }, + "size": { + "type": "long" + } + } +} \ No newline at end of file diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index 751a34c85..29c196a2b 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -292,6 +292,7 @@ public interface PersistenceService { /** * @deprecated use {@link #loadCustomItem(String, String)} */ + @Deprecated CustomItem loadCustomItem(String itemId, Date dateHint, String customItemType); /** @@ -676,22 +677,19 @@ public interface PersistenceService { <T extends Item> void refreshIndex(Class<T> clazz, Date dateHint); /** - * Purges all data in the context server up to the specified date, not included. - * - * @param date the date (not included) before which we want to erase all data + * deprecated: (use: purgeTimeBasedItems instead) */ @Deprecated void purge(Date date); /** - * Retrieves the number of document per indexes. - * If the index is a rollover index, each rollover index will be return with its own number of document - * For example: with "event" as parameter, the indexes named ...-event-000001, ...-event-000002 and so one will be returned + * Purges time based data in the context server up to the specified days number of existence. + * (This only works for time based data stored in rolling over indices, it have no effect on other types) * - * @param indexes names of the indexes to count the documents - * @return Map where the key in the index name and the value is the number of document for this index + * @param existsNumberOfDays the number of days + * @param clazz the item type to be purged */ - Map<String, Long> docCountPerIndex(String... indexes); + <T extends Item> void purgeTimeBasedItems(int existsNumberOfDays, Class<T> clazz); /** * Retrieves all items of the specified Item subclass which specified ranged property is within the specified bounds, ordered according to the specified {@code sortBy} String @@ -744,15 +742,6 @@ public interface PersistenceService { */ boolean removeIndex(final String itemType); - /** - * Removes the index for the specified item type. - * - * @param itemType the item type - * @param addPrefix should add the index prefix to the itemType passed as parameter - * @return {@code true} if the operation was successful, {@code false} otherwise - */ - boolean removeIndex(final String itemType, boolean addPrefix); - /** * Removes all data associated with the provided scope. * diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java index 7912cc6de..8221f19ff 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java @@ -370,40 +370,11 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } } - private <T extends Item> void purgeRolloverItems(int existsNumberOfDays, Class<T> clazz) { - if (existsNumberOfDays > 0) { - String conditionType = null; - String itemType = null; - - if (clazz.getName().equals(Event.class.getName())) { - conditionType = "eventPropertyCondition"; - itemType = Event.ITEM_TYPE; - } else if (clazz.getName().equals(Session.class.getName())) { - conditionType = "sessionPropertyCondition"; - itemType = Session.ITEM_TYPE; - } - - ConditionType propertyConditionType = definitionsService.getConditionType(conditionType); - if (propertyConditionType == null) { - // definition service not yet fully instantiate - return; - } - - Condition condition = new Condition(propertyConditionType); - - condition.setParameter("propertyName", "timeStamp"); - condition.setParameter("comparisonOperator", "lessThanOrEqualTo"); - condition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d"); - persistenceService.removeByQuery(condition, clazz); - deleteEmptyRolloverIndex(itemType); - } - } - @Override public void purgeSessionItems(int existsNumberOfDays) { if (existsNumberOfDays > 0) { logger.info("Purging: Sessions created since more than {} days", existsNumberOfDays); - purgeRolloverItems(existsNumberOfDays, Session.class); + persistenceService.purgeTimeBasedItems(existsNumberOfDays, Session.class); } } @@ -411,7 +382,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList public void purgeEventItems(int existsNumberOfDays) { if (existsNumberOfDays > 0) { logger.info("Purging: Events created since more than {} days", existsNumberOfDays); - purgeRolloverItems(existsNumberOfDays, Event.class); + persistenceService.purgeTimeBasedItems(existsNumberOfDays, Event.class); } } @@ -421,19 +392,6 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } - public void deleteEmptyRolloverIndex(String indexName) { - TreeMap<String, Long> countsPerIndex = new TreeMap<>(persistenceService.docCountPerIndex(indexName)); - if (countsPerIndex.size() >= 1) { - // do not check the last index, because it's the one used to write documents - countsPerIndex.pollLastEntry(); - countsPerIndex.forEach((index, count) -> { - if (count == 0) { - persistenceService.removeIndex(index, false); - } - }); - } - } - private void initializePurge() { logger.info("Purge: Initializing"); diff --git a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java index fafec7bcf..ea6109064 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java @@ -46,8 +46,6 @@ import java.util.stream.Collectors; public class RulesServiceImpl implements RulesService, EventListenerService, SynchronousBundleListener { - public static final String RULE_QUERY_PREFIX = "rule_"; - private static final String RULE_STAT_ID_SUFFIX = "-stat"; public static final String TRACKED_PARAMETER = "trackedConditionParameters"; private static final Logger logger = LoggerFactory.getLogger(RulesServiceImpl.class.getName()); @@ -253,10 +251,9 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn } private RuleStatistics getLocalRuleStatistics(Rule rule) { - String ruleStatisticsId = getRuleStatisticId(rule.getItemId()); - RuleStatistics ruleStatistics = this.allRuleStatistics.get(ruleStatisticsId); + RuleStatistics ruleStatistics = this.allRuleStatistics.get(rule.getItemId()); if (ruleStatistics == null) { - ruleStatistics = new RuleStatistics(ruleStatisticsId); + ruleStatistics = new RuleStatistics(rule.getItemId()); } return ruleStatistics; } @@ -267,10 +264,6 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics); } - private String getRuleStatisticId(String ruleID) { - return ruleID + RULE_STAT_ID_SUFFIX; - } - public void refreshRules() { try { // we use local variables to make sure we quickly switch the collections since the refresh is called often @@ -344,17 +337,14 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn @Override public RuleStatistics getRuleStatistics(String ruleId) { - String ruleStatisticsId = getRuleStatisticId(ruleId); - if (allRuleStatistics.containsKey(ruleStatisticsId)) { - return allRuleStatistics.get(ruleStatisticsId); + if (allRuleStatistics.containsKey(ruleId)) { + return allRuleStatistics.get(ruleId); } - return persistenceService.load(ruleStatisticsId, RuleStatistics.class); + return persistenceService.load(ruleId, RuleStatistics.class); } public Map<String, RuleStatistics> getAllRuleStatistics() { - return allRuleStatistics.keySet().stream() - .collect(Collectors.toMap(key -> key.endsWith(RULE_STAT_ID_SUFFIX) ? - key.substring(0, key.length() - RULE_STAT_ID_SUFFIX.length()) : key, allRuleStatistics::get)); + return allRuleStatistics; } @Override diff --git a/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy b/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy index 8a07a5278..631f0bced 100644 --- a/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy +++ b/tools/shell-commands/src/main/resources/META-INF/cxs/migration/migrate-2.2.0-05-indicesReduction.groovy @@ -23,6 +23,28 @@ MigrationContext context = migrationContext String esAddress = context.getConfigString("esAddress") String indexPrefix = context.getConfigString("indexPrefix") String baseSettings = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.0.0/base_index_mapping.json") +def indicesToReduce = [ + actiontype: [reduceTo: "systemitems", renameId: true], + campaign: [reduceTo: "systemitems", renameId: true], + campaignevent: [reduceTo: "systemitems", renameId: true], + goal: [reduceTo: "systemitems", renameId: true], + userlist: [reduceTo: "systemitems", renameId: true], + propertytype: [reduceTo: "systemitems", renameId: true], + scope: [reduceTo: "systemitems", renameId: true], + conditiontype: [reduceTo: "systemitems", renameId: true], + rule: [reduceTo: "systemitems", renameId: true], + scoring: [reduceTo: "systemitems", renameId: true], + segment: [reduceTo: "systemitems", renameId: true], + topic: [reduceTo: "systemitems", renameId: true], + patch: [reduceTo: "systemitems", renameId: true], + jsonschema: [reduceTo: "systemitems", renameId: true], + importconfig: [reduceTo: "systemitems", renameId: true], + exportconfig: [reduceTo: "systemitems", renameId: true], + rulestats: [reduceTo: "systemitems", renameId: true], + groovyaction: [reduceTo: "systemitems", renameId: true], + + persona: [reduceTo: "profile", renameId: false] +] context.performMigrationStep("2.2.0-create-systemItems-index", () -> { if (!MigrationUtils.indexExists(context.getHttpClient(), esAddress, "${indexPrefix}-systemitems")) { @@ -32,44 +54,20 @@ context.performMigrationStep("2.2.0-create-systemItems-index", () -> { } }) -def indicesToReduce = [ - actiontype: "systemitems", - campaign: "systemitems", - campaignevent: "systemitems", - goal: "systemitems", - userlist: "systemitems", - propertytype: "systemitems", - scope: "systemitems", - conditiontype: "systemitems", - rule: "systemitems", - scoring: "systemitems", - segment: "systemitems", - topic: "systemitems", - patch: "systemitems", - jsonschema: "systemitems", - importconfig: "systemitems", - exportconfig: "systemitems", - rulestats: "systemitems", - groovyaction: "systemitems", - persona: "profile", - personasession: "session" -] -def indicesToSuffixIds = [ - rulestats: "-stat", - groovyaction: "-groovySourceCode" -] indicesToReduce.each { indexToReduce -> context.performMigrationStep("2.2.0-reduce-${indexToReduce.key}", () -> { if (MigrationUtils.indexExists(context.getHttpClient(), esAddress, "${indexPrefix}-${indexToReduce.key}")) { def painless = null // check if we need to update the ids of those items first - if (indicesToSuffixIds.containsKey(indexToReduce.key)) { - painless = MigrationUtils.getFileWithoutComments(bundleContext, "requestBody/2.2.0/suffix_ids.painless").replace("#ID_SUFFIX", indicesToSuffixIds.get(indexToReduce.key)) + if (indexToReduce.value.renameId) { + painless = MigrationUtils.getFileWithoutComments(bundleContext, "requestBody/2.2.0/suffix_ids.painless").replace("#ID_SUFFIX", "_${indexToReduce.key}") } // move items - MigrationUtils.moveToIndex(context.getHttpClient(), bundleContext, esAddress, "${indexPrefix}-${indexToReduce.key}", "${indexPrefix}-${indexToReduce.value}", painless) + def reduceToIndex = "${indexPrefix}-${indexToReduce.value.reduceTo}" + MigrationUtils.moveToIndex(context.getHttpClient(), bundleContext, esAddress, "${indexPrefix}-${indexToReduce.key}", reduceToIndex, painless) MigrationUtils.deleteIndex(context.getHttpClient(), esAddress, "${indexPrefix}-${indexToReduce.key}") - HttpUtils.executePostRequest(context.getHttpClient(), esAddress + "/${indexPrefix}-${indexToReduce.value}/_refresh", null, null); + + HttpUtils.executePostRequest(context.getHttpClient(), esAddress + "/${reduceToIndex}/_refresh", null, null); MigrationUtils.waitForYellowStatus(context.getHttpClient(), esAddress, context); } })