This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch monthlyToRolloverIndicesPOC in repository https://gitbox.apache.org/repos/asf/unomi.git
commit b77f91f87b322fb761722d75fc823afe05886344 Author: Kevan <[email protected]> AuthorDate: Wed Jan 11 17:07:57 2023 +0100 UNOMI-724: base POC and implem for rollover system to replace monthly indices --- .../unomi/privacy/internal/PrivacyServiceImpl.java | 1 + .../ElasticSearchPersistenceServiceImpl.java | 198 +++++++++++---------- .../unomi/persistence/spi/PersistenceService.java | 20 +-- .../actions/MergeProfilesOnPropertyAction.java | 5 + .../services/impl/profiles/ProfileServiceImpl.java | 11 +- 5 files changed, 122 insertions(+), 113 deletions(-) diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java index 9c59f3d2c..25633859c 100644 --- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java +++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java @@ -142,6 +142,7 @@ public class PrivacyServiceImpl implements PrivacyService { persistenceService.save(session); List<Event> events = eventService.searchEvents(session.getItemId(), new String[0], null, 0, -1, null).getList(); for (Event event : events) { + // TODO dateHint not supported anymore here persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId()); } } diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 63150feec..1b053426d 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -48,10 +48,10 @@ import org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate; import org.apache.unomi.persistence.spi.aggregate.NumericRangeAggregate; import org.apache.unomi.persistence.spi.aggregate.TermsAggregate; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest; +import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; @@ -83,6 +83,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.core.MainResponse; +import org.elasticsearch.client.indexlifecycle.*; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; @@ -452,6 +453,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, throw new Exception("ElasticSearch version is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !"); } + createMonthlyIndexLifecyclePolicy(); + loadPredefinedMappings(bundleContext, false); // load predefined mappings and condition dispatchers of any bundles that were started before this one. @@ -461,8 +464,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - createMonthlyIndexTemplate(); - if (client != null && bulkProcessor == null) { bulkProcessor = getBulkProcessor(); } @@ -677,7 +678,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - private void loadPredefinedMappings(BundleContext bundleContext, boolean createMapping) { + private void loadPredefinedMappings(BundleContext bundleContext, boolean forceUpdateMapping) { Enumeration<URL> predefinedMappings = bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true); if (predefinedMappings == null) { return; @@ -692,14 +693,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, mappings.put(name, mappingSource); - String itemIndexName = getIndex(name, new Date()); - if (!client.indices().exists(new GetIndexRequest(itemIndexName), RequestOptions.DEFAULT)) { - logger.info("{} index doesn't exist yet, creating it...", itemIndexName); - internalCreateIndex(itemIndexName, mappingSource); - } else { - logger.info("Found index {}", itemIndexName); - if (createMapping) { - logger.info("Updating mapping for {}", itemIndexName); + if (!createIndex(name)) { + logger.info("Found index for type {}", name); + if (forceUpdateMapping) { + logger.info("Updating mapping for {}", name); createMapping(name, mappingSource); } } @@ -773,7 +770,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, itemType = customItemType; } - if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) { + if (itemsMonthlyIndexed.contains(itemType)) { return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") { @Override public T execute(Object... args) throws Exception { @@ -792,12 +789,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } }.execute(); } else { - GetRequest getRequest = new GetRequest(getIndex(itemType, dateHint), itemId); + GetRequest getRequest = new GetRequest(getIndex(itemType), itemId); GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); if (response.isExists()) { String sourceAsString = response.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); + setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); return value; } else { return null; @@ -820,11 +817,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } - private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm) { + private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm, String index) { item.setItemId(id); item.setVersion(version); item.setSystemMetadata(SEQ_NO, seqNo); item.setSystemMetadata(PRIMARY_TERM, primaryTerm); + item.setSystemMetadata("index", index); } @Override @@ -858,7 +856,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, className = CustomItem.class.getName() + "." + itemType; } String itemId = item.getItemId(); - String index = getIndex(itemType, itemsMonthlyIndexed.contains(itemType) ? ((TimestampedItem) item).getTimeStamp() : null); + String index = item.getSystemMetadata("index") != null ? + (String) item.getSystemMetadata("index") : + getIndex(itemType); IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(itemId); indexRequest.source(source, XContentType.JSON); @@ -884,7 +884,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (bulkProcessor == null || !useBatching) { indexRequest.setRefreshPolicy(getRefreshPolicy(itemType)); IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT); - setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); + setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); } else { bulkProcessor.add(indexRequest); } @@ -921,11 +921,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws Exception { try { - UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite); + UpdateRequest updateRequest = createUpdateRequest(clazz, item, source, alwaysOverwrite); if (bulkProcessor == null || !useBatchingForUpdate) { UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT); - setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); + setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); } else { bulkProcessor.add(updateRequest); } @@ -942,9 +942,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - private UpdateRequest createUpdateRequest(Class clazz, Date dateHint, Item item, Map source, boolean alwaysOverwrite) { + private UpdateRequest createUpdateRequest(Class clazz, Item item, Map source, boolean alwaysOverwrite) { String itemType = Item.getItemType(clazz); - UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId()); + UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType), item.getItemId()); updateRequest.doc(source); if (!alwaysOverwrite) { @@ -970,7 +970,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, BulkRequest bulkRequest = new BulkRequest(); items.forEach((item, source) -> { - UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite); + UpdateRequest updateRequest = createUpdateRequest(clazz, item, source, alwaysOverwrite); bulkRequest.add(updateRequest); }); @@ -999,7 +999,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (int i = 0; i < scripts.length; i++) { builtScripts[i] = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]); } - return updateWithQueryAndScript(dateHint, clazz, builtScripts, conditions); + return updateWithQueryAndScript(clazz, builtScripts, conditions); } @Override @@ -1008,16 +1008,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (int i = 0; i < scripts.length; i++) { builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i], scriptParams[i]); } - return updateWithQueryAndScript(dateHint, clazz, builtScripts, conditions); + return updateWithQueryAndScript(clazz, builtScripts, conditions); } - private boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final Script[] scripts, final Condition[] conditions) { + private boolean updateWithQueryAndScript(final Class<?> clazz, final Script[] scripts, final Condition[] conditions) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); - String index = getIndex(itemType, dateHint); + String index = getIndex(itemType); for (int i = 0; i < scripts.length; i++) { RefreshRequest refreshRequest = new RefreshRequest(index); @@ -1110,7 +1110,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); - String index = getIndex(itemType, dateHint); + String index = getIndex(itemType); Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams); @@ -1126,7 +1126,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, updateRequest.script(actualScript); if (bulkProcessor == null) { UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT); - setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); + setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex()); } else { bulkProcessor.add(updateRequest); } @@ -1281,40 +1281,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - public boolean createMonthlyIndexTemplate() { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { + public boolean createMonthlyIndexLifecyclePolicy() { + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexLifecyclePolicy", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws IOException { - boolean executedSuccessfully = true; - for (String itemName : itemsMonthlyIndexed) { - PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexPrefix + "-" + itemName + "-date-template") - .patterns(Collections.singletonList(getMonthlyIndexForQuery(itemName))) - .order(1) - .settings("{\n" + - " \"index\" : {\n" + - " \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" + - " \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + ",\n" + - " \"mapping.total_fields.limit\" : " + monthlyIndexMappingTotalFieldsLimit + ",\n" + - " \"max_docvalue_fields_search\" : " + monthlyIndexMaxDocValueFieldsSearch + "\n" + - " },\n" + - " \"analysis\": {\n" + - " \"analyzer\": {\n" + - " \"folding\": {\n" + - " \"type\":\"custom\",\n" + - " \"tokenizer\": \"keyword\",\n" + - " \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" + - " }\n" + - " }\n" + - " }\n" + - "}\n", XContentType.JSON); - if (mappings.get(itemName) == null) { - logger.warn("Couldn't find mapping for item {}, won't create monthly index template", itemName); - return false; - } - putIndexTemplateRequest.mapping(mappings.get(itemName), XContentType.JSON); - AcknowledgedResponse putIndexTemplateResponse = client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT); - executedSuccessfully &= putIndexTemplateResponse.isAcknowledged(); - } - return executedSuccessfully; + // Create the lifecycle policy for monthly indices + Map<String, Phase> phases = new HashMap<>(); + Map<String, LifecycleAction> hotActions = new HashMap<>(); + // TODO configure the rollover correctly, here it's 50000 bytes to test the rollover (5 sessions should trigger the rollover) + hotActions.put(RolloverAction.NAME, new RolloverAction(new ByteSizeValue(50000, ByteSizeUnit.BYTES), null, null)); + phases.put("hot", new Phase("hot", TimeValue.ZERO, hotActions)); + + Map<String, LifecycleAction> deleteActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction()); + phases.put("delete", new Phase("delete", new TimeValue(90, TimeUnit.DAYS), deleteActions)); + + LifecyclePolicy policy = new LifecyclePolicy("monthly-index-policy", phases); + PutLifecyclePolicyRequest request = new PutLifecyclePolicyRequest(policy); + org.elasticsearch.client.core.AcknowledgedResponse putLifecyclePolicy = client.indexLifecycle().putLifecyclePolicy(request, RequestOptions.DEFAULT); + return putLifecyclePolicy.isAcknowledged(); } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -1325,15 +1308,22 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } public boolean createIndex(final String itemType) { - String index = getIndex(itemType); Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws IOException { + String index = getIndex(itemType); GetIndexRequest getIndexRequest = new GetIndexRequest(index); boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT); + if (!indexExists) { - internalCreateIndex(index, mappings.get(itemType)); + if (itemsMonthlyIndexed.contains(itemType)) { + internalCreateMonthlyIndexTemplate(itemType); + internalCreateMonthlyIndex(index); + } else { + internalCreateIndex(index, mappings.get(itemType)); + } } + return !indexExists; } }.catchingExecuteInClassLoader(true); @@ -1367,6 +1357,47 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + private void internalCreateMonthlyIndexTemplate(String itemName) throws IOException { + String rolloverAlias = indexPrefix + "-" + itemName; + PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(rolloverAlias + "-date-template") + .patterns(Collections.singletonList(getMonthlyIndexForQuery(itemName))) + .order(1) + .settings("{\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" + + " \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + ",\n" + + " \"mapping.total_fields.limit\" : " + monthlyIndexMappingTotalFieldsLimit + ",\n" + + " \"max_docvalue_fields_search\" : " + monthlyIndexMaxDocValueFieldsSearch + ",\n" + + " \"lifecycle.name\": \"monthly-index-policy\",\n" + + " \"lifecycle.rollover_alias\": \"" + rolloverAlias + "\"" + + "" + + " },\n" + + " \"analysis\": {\n" + + " \"analyzer\": {\n" + + " \"folding\": {\n" + + " \"type\":\"custom\",\n" + + " \"tokenizer\": \"keyword\",\n" + + " \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n", XContentType.JSON); + if (mappings.get(itemName) == null) { + logger.warn("Couldn't find mapping for item {}, won't create monthly index template", itemName); + return; + } + putIndexTemplateRequest.mapping(mappings.get(itemName), XContentType.JSON); + client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT); + } + + private void internalCreateMonthlyIndex(String indexName) throws IOException { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName + "-000001") + .alias(new Alias(indexName).writeIndex(true)); + CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + logger.info("Index created: [{}], acknowledge: [{}], shards acknowledge: [{}]", createIndexResponse.index(), + createIndexResponse.isAcknowledged(), createIndexResponse.isShardsAcknowledged()); + } + private void internalCreateIndex(String indexName, String mappingSource) throws IOException { CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); createIndexRequest.settings("{\n" + @@ -1398,16 +1429,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public void createMapping(String type, String source) { try { - if (itemsMonthlyIndexed.contains(type)) { - createMonthlyIndexTemplate(); - String indexName = getIndex(type, new Date()); - GetIndexRequest getIndexRequest = new GetIndexRequest(indexName); - if (client.indices().exists(getIndexRequest, RequestOptions.DEFAULT)) { - putMapping(source, indexName); - } - } else { - putMapping(source, getIndex(type)); - } + putMapping(source, getIndex(type)); } catch (IOException ioe) { logger.error("Error while creating mapping for type " + type + " and source " + source, ioe); } @@ -1611,7 +1633,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, //Index the query = register it in the percolator try { logger.info("Saving query : " + queryName); - String index = getIndex(".percolator", null); + String index = getIndex(".percolator"); IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(queryName); indexRequest.source(query, XContentType.JSON); @@ -1645,7 +1667,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) throws Exception { //Index the query = register it in the percolator try { - String index = getIndex(".percolator", null); + String index = getIndex(".percolator"); DeleteRequest deleteRequest = new DeleteRequest(index); deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); client.delete(deleteRequest, RequestOptions.DEFAULT); @@ -1884,7 +1906,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // add hit to results String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm()); + setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); results.add(value); } @@ -1914,7 +1936,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (SearchHit searchHit : searchHits) { String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm()); + setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); results.add(value); } } @@ -1960,7 +1982,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // add hit to results String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm()); + setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); results.add(value); } } @@ -2001,7 +2023,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // add hit to results String sourceAsString = searchHit.getSourceAsString(); final CustomItem value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, CustomItem.class); - setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm()); + setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm(), searchHit.getIndex()); results.add(value); } } @@ -2244,7 +2266,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) { try { String itemType = Item.getItemType(clazz); - String index = getIndex(itemType, dateHint); + String index = getIndex(itemType); client.indices().refresh(Requests.refreshRequest(index), RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace();//TODO manage ES7 @@ -2485,27 +2507,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } private String getIndexNameForQuery(String itemType) { - return itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexForQuery(itemType) : getIndex(itemType, null); + return itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexForQuery(itemType) : getIndex(itemType); } private String getMonthlyIndexForQuery(String itemType) { - return indexPrefix + "-" + itemType.toLowerCase() + "-" + INDEX_DATE_PREFIX + "*"; - } - - private String getIndex(String itemType, Date dateHint) { - String indexItemTypePart = itemsMonthlyIndexed.contains(itemType) && dateHint != null ? itemType + "-" + getMonthlyIndexPart(dateHint) : itemType; - return getIndex(indexItemTypePart); + return indexPrefix + "-" + itemType.toLowerCase() + "-*"; } private String getIndex(String indexItemTypePart) { return (indexPrefix + "-" + indexItemTypePart).toLowerCase(); } - private String getMonthlyIndexPart(Date date) { - String d = new SimpleDateFormat("yyyy-MM").format(date); - return INDEX_DATE_PREFIX + d; - } - private WriteRequest.RefreshPolicy getRefreshPolicy(String itemType) { if (itemTypeToRefreshPolicy.containsKey(itemType)) { return itemTypeToRefreshPolicy.get(itemType); diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index 625a5948e..a2770e2d5 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -125,7 +125,7 @@ public interface PersistenceService { * Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map. * * @param item the item we want to update - * @param dateHint a Date helping in identifying where the item is located + * @param dateHint deprecated * @param clazz the Item subclass of the item to update * @param source a Map with entries specifying as key the property name to update and as value its new value * @return {@code true} if the update was successful, {@code false} otherwise @@ -137,7 +137,7 @@ public interface PersistenceService { * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))} * * @param item the item we want to update - * @param dateHint a Date helping in identifying where the item is located + * @param dateHint deprecated * @param clazz the Item subclass of the item to update * @param propertyName the name of the property to update * @param propertyValue the new value of the property @@ -149,7 +149,7 @@ public interface PersistenceService { * Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map. * * @param item the item we want to update - * @param dateHint a Date helping in identifying where the item is located + * @param dateHint deprecated * @param clazz the Item subclass of the item to update * @param source a Map with entries specifying as key the property name to update and as value its new value * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving @@ -162,7 +162,7 @@ public interface PersistenceService { * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))} * * @param items A map the consist of item (key) and properties to update (value) - * @param dateHint a Date helping in identifying where the item is located + * @param dateHint deprecated * @param clazz the Item subclass of the item to update * @return List of failed Items Ids, if all succesful then returns an empty list. if the whole operation failed then will return null */ @@ -173,7 +173,7 @@ public interface PersistenceService { * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))} * * @param item the item we want to update - * @param dateHint a Date helping in identifying where the item is located + * @param dateHint deprecated * @param clazz the Item subclass of the item to update * @param script inline script * @param scriptParams script params @@ -185,7 +185,7 @@ public interface PersistenceService { * Updates the items of the specified class by a query with a new property value for the specified property name * based on provided scripts and script parameters * - * @param dateHint a Date helping in identifying where the item is located + * @param dateHint deprecated * @param clazz the Item subclass of the item to update * @param scripts inline scripts array * @param scriptParams script params array @@ -198,7 +198,7 @@ public interface PersistenceService { * Updates the items of the specified class by a query with a new property value for the specified property name * based on provided stored scripts and script parameters * - * @param dateHint a Date helping in identifying where the item is located + * @param dateHint deprecated * @param clazz the Item subclass of the item to update * @param scripts Stored scripts name * @param scriptParams script params array @@ -230,7 +230,7 @@ public interface PersistenceService { * * @param <T> the type of the Item subclass we want to retrieve * @param itemId the identifier of the item we want to retrieve - * @param dateHint a Date helping in identifying where the item is located + * @param dateHint deprecated * @param clazz the {@link Item} subclass of the item we want to retrieve * @return the item identified with the specified identifier and with the specified Item subclass if it exists, {@code null} otherwise */ @@ -239,7 +239,7 @@ public interface PersistenceService { /** * Load a custom item type identified by an identifier, an optional date hint and the identifier of the custom item type * @param itemId the identifier of the custom type we want to retrieve - * @param dateHint an optional Date object if the custom item types are stored by date + * @param dateHint deprecated * @param customItemType an identifier of the custom item type to load * @return the CustomItem instance with the specified identifier and the custom item type if it exists, {@code null} otherwise */ @@ -610,7 +610,7 @@ public interface PersistenceService { * Updates the persistence's engine specific index. * * @param clazz will use an index by class type - * @param dateHint for index with time, can be null + * @param dateHint deprecated * @param <T> a class that extends Item */ <T extends Item> void refreshIndex(Class<T> clazz, Date dateHint); diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java index 752068594..9e64ef075 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java @@ -162,6 +162,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { // Update current event explicitly, as it might not return from search query if there wasn't a refresh in ES if (!StringUtils.equals(profileId, masterProfileId)) { if (currentEvent.isPersistent()) { + // TODO dateHint not supported anymore here persistenceService.update(currentEvent, currentEvent.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId); } } @@ -169,6 +170,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { for (Profile profile : profiles) { String profileId = profile.getItemId(); if (!StringUtils.equals(profileId, masterProfileId)) { + // TODO consider udpate by query and/or script List<Session> sessions = persistenceService.query("profileId", profileId, null, Session.class); if (currentSession != null) { if (masterProfileId.equals(profileId) && !sessions.contains(currentSession)) { @@ -177,12 +179,15 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { } for (Session session : sessions) { + // TODO dateHint not supported anymore here persistenceService.update(session, session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId); } + // TODO consider udpate by query and/or script List<Event> events = persistenceService.query("profileId", profileId, null, Event.class); for (Event event : events) { if (!event.getItemId().equals(currentEvent.getItemId())) { + // TODO dateHint not supported anymore here persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId); } } diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java index bc93fd0c5..8ec136ef2 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java @@ -848,16 +848,7 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } public Session loadSession(String sessionId, Date dateHint) { - Session s = persistenceService.load(sessionId, dateHint, Session.class); - if (s == null && dateHint != null) { - GregorianCalendar gc = new GregorianCalendar(); - gc.setTime(dateHint); - if (gc.get(Calendar.DAY_OF_MONTH) == 1) { - gc.add(Calendar.DAY_OF_MONTH, -1); - s = persistenceService.load(sessionId, gc.getTime(), Session.class); - } - } - return s; + return persistenceService.load(sessionId, null, Session.class); } public Session saveSession(Session session) {
