This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch purge_system in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 0cde2fa41042715100e7a5a12dcde3e89d950845 Author: Kevan <[email protected]> AuthorDate: Thu Oct 27 18:31:44 2022 +0200 UNOMI-670: improve purge system --- .../apache/unomi/api/services/ProfileService.java | 19 +++ .../org/apache/unomi/itests/ProfileServiceIT.java | 141 ++++++++++++++++++- .../ElasticSearchPersistenceServiceImpl.java | 155 +++++++-------------- .../resources/OSGI-INF/blueprint/blueprint.xml | 4 - .../services/impl/profiles/ProfileServiceImpl.java | 123 +++++++++------- 5 files changed, 277 insertions(+), 165 deletions(-) diff --git a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java index e51091961..86fac9f61 100644 --- a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java +++ b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java @@ -400,4 +400,23 @@ public interface ProfileService { * in specific scenarios such as integration tests. */ void refresh(); + + /** + * Purge (delete) profiles + * example: Purge profile inactive since 10 days only: + * purgeProfiles(10, 0); + * + * example: Purge profile created since 30 days only: + * purgeProfiles(0, 30); + * + * @param inactiveNumberOfDays will purge profiles with no visits since this number of days (0 or negative value, will have no effect) + * @param existsNumberOfDays will purge profiles created since this number of days (0 or negative value, will have no effect) + */ + void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays); + + /** + * Purge (delete) monthly indices by removing old indices + * @param existsNumberOfMonths used to remove monthly indices older than this number of months + */ + void purgeMonthlyItems(int existsNumberOfMonths); } diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java index c85a83a59..30bf8dcf1 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java @@ -36,10 +36,9 @@ import org.slf4j.LoggerFactory; import javax.inject.Inject; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.*; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; @@ -65,7 +64,7 @@ public class ProfileServiceIT extends BaseIT { @After public void tearDown() throws InterruptedException { - removeItems(Profile.class, ProfileAlias.class); + removeItems(Profile.class, ProfileAlias.class, Event.class, Session.class); } @Test @@ -266,4 +265,136 @@ public class ProfileServiceIT extends BaseIT { // do nothing, it's expected } } + + @Test + public void testProfilePurge() throws Exception { + Date currentDate = new Date(); + LocalDateTime minus10Days = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusDays(10); + LocalDateTime minus30Days = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusDays(30); + Date currentDateMinus10Days = Date.from(minus10Days.atZone(ZoneId.systemDefault()).toInstant()); + Date currentDateMinus30Days = Date.from(minus30Days.atZone(ZoneId.systemDefault()).toInstant()); + + long originalProfilesCount = persistenceService.getAllItemsCount(Profile.ITEM_TYPE); + + // create inactive profiles since 10 days + for (int i = 0; i < 150; i++) { + Profile profile = new Profile("inactive-profile-to-be-purge-" + i); + profile.setProperty("lastVisit", currentDateMinus10Days); + profile.setProperty("firstVisit", currentDateMinus10Days); + persistenceService.save(profile); + } + + // create active profiles created 30 days ago + for (int i = 0; i < 150; i++) { + Profile profile = new Profile("old-profile-to-be-purge-" + i); + profile.setProperty("lastVisit", currentDate); + profile.setProperty("firstVisit", currentDateMinus30Days); + persistenceService.save(profile); + } + + // create active and recent profile + for (int i = 0; i < 150; i++) { + Profile profile = new Profile("active-profile" + i); + profile.setProperty("lastVisit", currentDate); + profile.setProperty("firstVisit", currentDate); + persistenceService.save(profile); + } + + keepTrying("Failed waiting for all profiles to be available", () -> profileService.getAllProfilesCount(), + (count) -> count == (450 + originalProfilesCount), 1000, 100); + + // Try purge with 0 params: should have no effects + profileService.purgeProfiles(0, 0); + keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(), + (count) -> count == (450 + originalProfilesCount), 1000, 100); + + // Try purge inactive profiles since 20 days, should have no effects there is no such profiles + profileService.purgeProfiles(20, 0); + keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(), + (count) -> count == (450 + originalProfilesCount), 1000, 100); + + // Try purge inactive profiles since 20 days and/or older than 40 days, should have no effects there is no such profiles + profileService.purgeProfiles(20, 40); + keepTrying("We should still have 450 profiles", () -> profileService.getAllProfilesCount(), + (count) -> count == (450 + originalProfilesCount), 1000, 100); + + // Try purge inactive profiles since 5 days + profileService.purgeProfiles(5, 0); + keepTrying("Inactive profiles should be purge so we should have 300 profiles now", () -> profileService.getAllProfilesCount(), + (count) -> count == (300 + originalProfilesCount), 1000, 100); + + // Try purge inactive profiles since 5 days and/or older than 25 days + profileService.purgeProfiles(5, 25); + keepTrying("Older profiles should be purge so we should have 150 profiles now", () -> profileService.getAllProfilesCount(), + (count) -> count == (150 + originalProfilesCount), 1000, 100); + } + + @Test + public void testMonthlyIndicesPurge() throws Exception { + Date currentDate = new Date(); + LocalDateTime minus10Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(10); + LocalDateTime minus30Months = LocalDateTime.ofInstant(currentDate.toInstant(), ZoneId.systemDefault()).minusMonths(30); + Date currentDateMinus10Months = Date.from(minus10Months.atZone(ZoneId.systemDefault()).toInstant()); + Date currentDateMinus30Months = Date.from(minus30Months.atZone(ZoneId.systemDefault()).toInstant()); + + long originalSessionsCount = persistenceService.getAllItemsCount(Session.ITEM_TYPE); + long originalEventsCount = persistenceService.getAllItemsCount(Event.ITEM_TYPE); + + Profile profile = new Profile("dummy-profile-monthly-purge-test"); + persistenceService.save(profile); + + // create 10 months old items + for (int i = 0; i < 150; i++) { + Session session = new Session("10months-old-session-" + i, profile, currentDateMinus10Months, "dummy-scope"); + persistenceService.save(session); + persistenceService.save(new Event("10months-old-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDateMinus10Months)); + } + + // create 30 months old items + for (int i = 0; i < 150; i++) { + Session session = new Session("30months-old-session-" + i, profile, currentDateMinus30Months, "dummy-scope"); + persistenceService.save(session); + persistenceService.save(new Event("30months-old-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDateMinus30Months)); + } + + // create 30 months old items + for (int i = 0; i < 150; i++) { + Session session = new Session("recent-session-" + i, profile, currentDate, "dummy-scope"); + persistenceService.save(session); + persistenceService.save(new Event("recent-event-" + i, "view", session, profile, "dummy-scope", null, null, currentDate)); + } + + keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE), + (count) -> count == (450 + originalSessionsCount), 1000, 100); + keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE), + (count) -> count == (450 + originalEventsCount), 1000, 100); + + // Should have no effect + profileService.purgeMonthlyItems(0); + keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE), + (count) -> count == (450 + originalSessionsCount), 1000, 100); + keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE), + (count) -> count == (450 + originalEventsCount), 1000, 100); + + // Should have no effect there is no monthly items older than 40 months + profileService.purgeMonthlyItems(40); + keepTrying("Sessions number should be 450", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE), + (count) -> count == (450 + originalSessionsCount), 1000, 100); + keepTrying("Events number should be 450", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE), + (count) -> count == (450 + originalEventsCount), 1000, 100); + + // Should purge monthly items older than 25 days + profileService.purgeMonthlyItems(25); + keepTrying("Sessions number should be 300", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE), + (count) -> count == (300 + originalSessionsCount), 1000, 100); + keepTrying("Events number should be 300", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE), + (count) -> count == (300 + originalEventsCount), 1000, 100); + + // Should purge monthly items older than 5 days + profileService.purgeMonthlyItems(5); + keepTrying("Sessions number should be 150", () -> persistenceService.getAllItemsCount(Session.ITEM_TYPE), + (count) -> count == (150 + originalSessionsCount), 1000, 100); + keepTrying("Events number should be 150", () -> persistenceService.getAllItemsCount(Event.ITEM_TYPE), + (count) -> count == (150 + originalEventsCount), 1000, 100); + } } diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 27bdd35b3..63150feec 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -19,7 +19,6 @@ package org.apache.unomi.persistence.elasticsearch; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.hazelcast.core.HazelcastInstance; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -49,6 +48,7 @@ import org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate; import org.apache.unomi.persistence.spi.aggregate.NumericRangeAggregate; import org.apache.unomi.persistence.spi.aggregate.TermsAggregate; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest; @@ -103,8 +103,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.*; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.reindex.*; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptException; @@ -161,7 +160,6 @@ import java.util.Collections; import java.util.Date; import java.util.Enumeration; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -215,6 +213,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Map<String, String> routingByType; private Integer defaultQueryLimit = 10; + private Integer removeByQueryTimeoutInMinutes = 10; private String itemsMonthlyIndexedOverride = "event,session"; private String bulkProcessorConcurrentRequests = "1"; @@ -235,9 +234,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private int aggregateQueryBucketSize = 5000; private MetricsService metricsService; - private HazelcastInstance hazelcastInstance; - private Set<String> itemClassesToCacheSet = new HashSet<>(); - private String itemClassesToCache; private boolean useBatchingForSave = false; private boolean useBatchingForUpdate = true; private String logLevelRestClient = "ERROR"; @@ -382,23 +378,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.metricsService = metricsService; } - public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { - this.hazelcastInstance = hazelcastInstance; - } - - public void setItemClassesToCache(String itemClassesToCache) { - this.itemClassesToCache = itemClassesToCache; - if (StringUtils.isNotBlank(itemClassesToCache)) { - String[] itemClassesToCacheParts = itemClassesToCache.split(","); - if (itemClassesToCacheParts != null) { - itemClassesToCacheSet.clear(); - for (String itemClassToCache : itemClassesToCacheParts) { - itemClassesToCacheSet.add(itemClassToCache.trim()); - } - } - } - } - public void setUseBatchingForSave(boolean useBatchingForSave) { this.useBatchingForSave = useBatchingForSave; } @@ -789,16 +768,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); String className = clazz.getName(); - if (customItemType == null) { - T itemFromCache = getFromCache(itemId, clazz.getName()); - if (itemFromCache != null) { - return itemFromCache; - } - } else { - T itemFromCache = getFromCache(itemId, CustomItem.class.getName() + "." + customItemType); - if (itemFromCache != null) { - return itemFromCache; - } + if (customItemType != null) { className = CustomItem.class.getName() + "." + customItemType; itemType = customItemType; } @@ -828,7 +798,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String sourceAsString = response.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); - putInCache(itemId, value, className); return value; } else { return null; @@ -889,7 +858,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, className = CustomItem.class.getName() + "." + itemType; } String itemId = item.getItemId(); - putInCache(itemId, item, className); String index = getIndex(itemType, itemsMonthlyIndexed.contains(itemType) ? ((TimestampedItem) item).getTimeStamp() : null); IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(itemId); @@ -1215,50 +1183,59 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); + final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType)) + .setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query)) + // Setting slices to auto will let Elasticsearch choose the number of slices to use. + // This setting will use one slice per shard, up to a certain limit. + // The delete request will be more efficient and faster than no slicing. + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) + // Elasticsearch takes a snapshot of the index when you hit delete by query request and uses the _version of the documents to process the request. + // If a document gets updated in the meantime, it will result in a version conflict error and the delete operation will fail. + // So we explicitly set the conflict strategy to proceed in case of version conflict. + .setAbortOnVersionConflict(false) + // Remove by Query is mostly used for purge and cleaning up old data + // It's mostly used in jobs/timed tasks so we don't really care about long request + // So we increase default timeout of 1min to 10min + .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes)); + + BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + + if (bulkByScrollResponse == null) { + logger.error("Remove by query: no response returned for query: {}", query); + return false; + } - BulkRequest deleteByScopeBulkRequest = new BulkRequest(); - - final TimeValue keepAlive = TimeValue.timeValueHours(1); - SearchRequest searchRequest = new SearchRequest(getAllIndexForQuery()) - .indices(getIndexNameForQuery(itemType)) - .scroll(keepAlive); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() - .query(conditionESQueryBuilderDispatcher.getQueryBuilder(query)) - .size(100); - searchRequest.source(searchSourceBuilder); - - SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); + if (bulkByScrollResponse.isTimedOut()) { + logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query); + } - // Scroll until no more hits are returned - while (true) { + if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) || + bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { + logger.warn("Remove by query: we found some failure during the process of query: {}", query); - for (SearchHit hit : response.getHits().getHits()) { - // add hit to bulk delete - deleteFromCache(hit.getId(), clazz.getName()); - deleteByScopeBulkRequest.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId())); + if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) { + for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) { + logger.warn("Remove by query, search failure: {}", searchFailure.toString()); + } } - SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId()); - searchScrollRequest.scroll(keepAlive); - response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT); - - // If we have no more hits, exit - if (response.getHits().getHits().length == 0) { - break; + if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { + for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) { + logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString()); + } } } - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(response.getScrollId()); - client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); - - // we're done with the scrolling, delete now - if (deleteByScopeBulkRequest.numberOfActions() > 0) { - final BulkResponse deleteResponse = client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT); - if (deleteResponse.hasFailures()) { - // do something - logger.warn("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage()); - } + if (logger.isDebugEnabled()) { + logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}", + bulkByScrollResponse.getTook().toHumanReadableString(1), + bulkByScrollResponse.getDeleted(), + bulkByScrollResponse.getBatches(), + bulkByScrollResponse.getNoops(), + bulkByScrollResponse.getVersionConflicts(), + bulkByScrollResponse.getSearchRetries(), + bulkByScrollResponse.getBulkRetries(), + query); } return true; @@ -2503,40 +2480,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - private <T extends Item> boolean isCacheActiveForClass(String className) { - if (itemClassesToCacheSet.contains("*")) { - return true; - } - if (itemClassesToCacheSet.contains(className)) { - return true; - } - return false; - } - - private <T extends Item> T getFromCache(String itemId, String className) { - if (!isCacheActiveForClass(className)) { - return null; - } - Map<String, T> itemCache = hazelcastInstance.getMap(className); - return itemCache.get(itemId); - } - - private <T extends Item> T putInCache(String itemId, T item, String className) { - if (!isCacheActiveForClass(className)) { - return null; - } - Map<String, T> itemCache = hazelcastInstance.getMap(className); - return itemCache.put(itemId, item); - } - - private <T extends Item> T deleteFromCache(String itemId, String className) { - if (!isCacheActiveForClass(className)) { - return null; - } - Map<String, T> itemCache = hazelcastInstance.getMap(className); - return itemCache.remove(itemId); - } - private String getAllIndexForQuery() { return indexPrefix + "*"; } diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 44e0411e6..e0fdded38 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -58,7 +58,6 @@ <cm:property name="aggQueryMaxResponseSizeHttp" value="" /> <cm:property name="aggQueryThrowOnMissingDocs" value="false" /> <cm:property name="itemTypeToRefreshPolicy" value="" /> - <cm:property name="itemClassesToCache" value="" /> <cm:property name="useBatchingForSave" value="false" /> <cm:property name="useBatchingForUpdate" value="true" /> @@ -74,7 +73,6 @@ </cm:property-placeholder> <reference id="metricsService" interface="org.apache.unomi.metrics.MetricsService" /> - <reference id="hazelcastInstance" interface="com.hazelcast.core.HazelcastInstance" /> <reference id="scriptExecutor" interface="org.apache.unomi.scripting.ScriptExecutor" /> <service id="elasticSearchPersistenceService" ref="elasticSearchPersistenceServiceImpl"> @@ -137,8 +135,6 @@ <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" /> <property name="metricsService" ref="metricsService" /> - <property name="hazelcastInstance" ref="hazelcastInstance" /> - <property name="itemClassesToCache" value="${es.itemClassesToCache}" /> <property name="useBatchingForSave" value="${es.useBatchingForSave}" /> <property name="useBatchingForUpdate" value="${es.useBatchingForUpdate}" /> diff --git a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java index a11189ae9..a259c1491 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java @@ -198,14 +198,15 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList private SegmentService segmentService; - private Condition purgeProfileQuery; private Integer purgeProfileExistTime = 0; private Integer purgeProfileInactiveTime = 0; private Integer purgeSessionsAndEventsTime = 0; private Integer purgeProfileInterval = 0; + private TimerTask purgeTask = null; private long propertiesRefreshInterval = 10000; private PropertyTypes propertyTypes; + private TimerTask propertyTypeLoadTask = null; private boolean forceRefreshOnSave = false; @@ -258,6 +259,12 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } public void preDestroy() { + if (purgeTask != null) { + purgeTask.cancel(); + } + if (propertyTypeLoadTask != null) { + propertyTypeLoadTask.cancel(); + } bundleContext.removeBundleListener(this); logger.info("Profile service shutdown."); } @@ -290,13 +297,13 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } private void schedulePropertyTypeLoad() { - TimerTask task = new TimerTask() { + propertyTypeLoadTask = new TimerTask() { @Override public void run() { reloadPropertyTypes(false); } }; - schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS); + schedulerService.getScheduleExecutorService().scheduleAtFixedRate(propertyTypeLoadTask, 10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS); logger.info("Scheduled task for property type loading each 10s"); } @@ -319,74 +326,90 @@ public class ProfileServiceImpl implements ProfileService, SynchronousBundleList } } + @Override + public void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays) { + if (inactiveNumberOfDays > 0 || existsNumberOfDays > 0) { + ConditionType profilePropertyConditionType = definitionsService.getConditionType("profilePropertyCondition"); + ConditionType booleanCondition = definitionsService.getConditionType("booleanCondition"); + if (profilePropertyConditionType == null || booleanCondition == null) { + // definition service not yet fully instantiate + return; + } + + Condition purgeProfileQuery = new Condition(booleanCondition); + purgeProfileQuery.setParameter("operator", "or"); + List<Condition> subConditions = new ArrayList<>(); + + if (inactiveNumberOfDays > 0) { + logger.info("Purging: Profile with no visits since {} days", inactiveNumberOfDays); + Condition inactiveTimeCondition = new Condition(profilePropertyConditionType); + inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit"); + inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo"); + inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" + inactiveNumberOfDays + "d"); + subConditions.add(inactiveTimeCondition); + } + + if (existsNumberOfDays > 0) { + Condition existTimeCondition = new Condition(profilePropertyConditionType); + logger.info("Purging: Profile created since more than {} days", existsNumberOfDays); + existTimeCondition.setParameter("propertyName", "properties.firstVisit"); + existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo"); + existTimeCondition.setParameter("propertyValueDateExpr", "now-" + existsNumberOfDays + "d"); + subConditions.add(existTimeCondition); + } + + purgeProfileQuery.setParameter("subConditions", subConditions); + persistenceService.removeByQuery(purgeProfileQuery, Profile.class); + } + } + + @Override + public void purgeMonthlyItems(int existsNumberOfMonths) { + if (existsNumberOfMonths > 0) { + logger.info("Purging: Monthly items (sessions/events) created before {} months", existsNumberOfMonths); + persistenceService.purge(getMonth(-existsNumberOfMonths).getTime()); + } + } + private void initializePurge() { - logger.info("Profile purge: Initializing"); + logger.info("Purge: Initializing"); if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 || purgeSessionsAndEventsTime > 0) { if (purgeProfileInactiveTime > 0) { - logger.info("Profile purge: Profile with no visits since {} days, will be purged", purgeProfileInactiveTime); + logger.info("Purge: Profile with no visits since more than {} days, will be purged", purgeProfileInactiveTime); } if (purgeProfileExistTime > 0) { - logger.info("Profile purge: Profile created since {} days, will be purged", purgeProfileExistTime); + logger.info("Purge: Profile created since more than {} days, will be purged", purgeProfileExistTime); + } + if (purgeSessionsAndEventsTime > 0) { + logger.info("Purge: Monthly items (sessions/events) created since more than {} months, will be purged", purgeSessionsAndEventsTime); } - TimerTask task = new TimerTask() { + purgeTask = new TimerTask() { @Override public void run() { try { long purgeStartTime = System.currentTimeMillis(); - logger.debug("Profile purge: Purge triggered"); - - if (purgeProfileQuery == null) { - ConditionType profilePropertyConditionType = definitionsService.getConditionType("profilePropertyCondition"); - ConditionType booleanCondition = definitionsService.getConditionType("booleanCondition"); - if (profilePropertyConditionType == null || booleanCondition == null) { - // definition service not yet fully instantiate - return; - } + logger.info("Purge: triggered"); - purgeProfileQuery = new Condition(booleanCondition); - purgeProfileQuery.setParameter("operator", "or"); - List<Condition> subConditions = new ArrayList<>(); + // Profile purge + purgeProfiles(purgeProfileInactiveTime, purgeProfileExistTime); - if (purgeProfileInactiveTime > 0) { - Condition inactiveTimeCondition = new Condition(profilePropertyConditionType); - inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit"); - inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo"); - inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" + purgeProfileInactiveTime + "d"); - subConditions.add(inactiveTimeCondition); - } - - if (purgeProfileExistTime > 0) { - Condition existTimeCondition = new Condition(profilePropertyConditionType); - existTimeCondition.setParameter("propertyName", "properties.firstVisit"); - existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo"); - existTimeCondition.setParameter("propertyValueDateExpr", "now-" + purgeProfileExistTime + "d"); - subConditions.add(existTimeCondition); - } + // Monthly items purge + purgeMonthlyItems(purgeSessionsAndEventsTime); - purgeProfileQuery.setParameter("subConditions", subConditions); - } - - persistenceService.removeByQuery(purgeProfileQuery, Profile.class); - - if (purgeSessionsAndEventsTime > 0) { - logger.info("Monthly indexes purge: Session and events created before {} months, will be purged", - purgeSessionsAndEventsTime); - persistenceService.purge(getMonth(-purgeSessionsAndEventsTime).getTime()); - } - - logger.info("Profile purge: purge executed in {} ms", System.currentTimeMillis() - purgeStartTime); + logger.info("Purge: executed in {} ms", System.currentTimeMillis() - purgeStartTime); } catch (Throwable t) { - logger.error("Error while purging profiles", t); + logger.error("Error while purging", t); } } }; - schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 1, purgeProfileInterval, TimeUnit.DAYS); - logger.info("Profile purge: purge scheduled with an interval of {} days", purgeProfileInterval); + schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1, purgeProfileInterval, TimeUnit.DAYS); + + logger.info("Purge: purge scheduled with an interval of {} days", purgeProfileInterval); } else { - logger.info("Profile purge: No purge scheduled"); + logger.info("Purge: No purge scheduled"); } }
