This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new 421d7fd40 UNOMI-670: improve purge system (#526)
421d7fd40 is described below
commit 421d7fd407a32cd6f5117812647172a3e7251d16
Author: kevan Jahanshahi <[email protected]>
AuthorDate: Fri Oct 28 09:30:47 2022 +0200
UNOMI-670: improve purge system (#526)
---
.../apache/unomi/api/services/ProfileService.java | 19 +++
.../org/apache/unomi/itests/ProfileServiceIT.java | 141 ++++++++++++++++++-
.../ElasticSearchPersistenceServiceImpl.java | 155 +++++++--------------
.../resources/OSGI-INF/blueprint/blueprint.xml | 4 -
.../services/impl/profiles/ProfileServiceImpl.java | 123 +++++++++-------
5 files changed, 277 insertions(+), 165 deletions(-)
diff --git
a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
index e51091961..86fac9f61 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ProfileService.java
@@ -400,4 +400,23 @@ public interface ProfileService {
* in specific scenarios such as integration tests.
*/
void refresh();
+
+ /**
+ * Purge (delete) profiles
+ * example: Purge profile inactive since 10 days only:
+ * purgeProfiles(10, 0);
+ *
+ * example: Purge profile created since 30 days only:
+ * purgeProfiles(0, 30);
+ *
+ * @param inactiveNumberOfDays will purge profiles with no visits since
this number of days (0 or negative value, will have no effect)
+ * @param existsNumberOfDays will purge profiles created since this number
of days (0 or negative value, will have no effect)
+ */
+ void purgeProfiles(int inactiveNumberOfDays, int existsNumberOfDays);
+
+ /**
+ * Purge (delete) monthly indices by removing old indices
+ * @param existsNumberOfMonths used to remove monthly indices older than
this number of months
+ */
+ void purgeMonthlyItems(int existsNumberOfMonths);
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index c85a83a59..30bf8dcf1 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -36,10 +36,9 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
@@ -65,7 +64,7 @@ public class ProfileServiceIT extends BaseIT {
@After
public void tearDown() throws InterruptedException {
- removeItems(Profile.class, ProfileAlias.class);
+ removeItems(Profile.class, ProfileAlias.class, Event.class,
Session.class);
}
@Test
@@ -266,4 +265,136 @@ public class ProfileServiceIT extends BaseIT {
// do nothing, it's expected
}
}
+
+ @Test
+ public void testProfilePurge() throws Exception {
+ Date currentDate = new Date();
+ LocalDateTime minus10Days =
LocalDateTime.ofInstant(currentDate.toInstant(),
ZoneId.systemDefault()).minusDays(10);
+ LocalDateTime minus30Days =
LocalDateTime.ofInstant(currentDate.toInstant(),
ZoneId.systemDefault()).minusDays(30);
+ Date currentDateMinus10Days =
Date.from(minus10Days.atZone(ZoneId.systemDefault()).toInstant());
+ Date currentDateMinus30Days =
Date.from(minus30Days.atZone(ZoneId.systemDefault()).toInstant());
+
+ long originalProfilesCount =
persistenceService.getAllItemsCount(Profile.ITEM_TYPE);
+
+ // create inactive profiles since 10 days
+ for (int i = 0; i < 150; i++) {
+ Profile profile = new Profile("inactive-profile-to-be-purge-" + i);
+ profile.setProperty("lastVisit", currentDateMinus10Days);
+ profile.setProperty("firstVisit", currentDateMinus10Days);
+ persistenceService.save(profile);
+ }
+
+ // create active profiles created 30 days ago
+ for (int i = 0; i < 150; i++) {
+ Profile profile = new Profile("old-profile-to-be-purge-" + i);
+ profile.setProperty("lastVisit", currentDate);
+ profile.setProperty("firstVisit", currentDateMinus30Days);
+ persistenceService.save(profile);
+ }
+
+ // create active and recent profile
+ for (int i = 0; i < 150; i++) {
+ Profile profile = new Profile("active-profile" + i);
+ profile.setProperty("lastVisit", currentDate);
+ profile.setProperty("firstVisit", currentDate);
+ persistenceService.save(profile);
+ }
+
+ keepTrying("Failed waiting for all profiles to be available", () ->
profileService.getAllProfilesCount(),
+ (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+ // Try purge with 0 params: should have no effects
+ profileService.purgeProfiles(0, 0);
+ keepTrying("We should still have 450 profiles", () ->
profileService.getAllProfilesCount(),
+ (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+ // Try purge inactive profiles since 20 days, should have no effects
there is no such profiles
+ profileService.purgeProfiles(20, 0);
+ keepTrying("We should still have 450 profiles", () ->
profileService.getAllProfilesCount(),
+ (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+ // Try purge inactive profiles since 20 days and/or older than 40
days, should have no effects there is no such profiles
+ profileService.purgeProfiles(20, 40);
+ keepTrying("We should still have 450 profiles", () ->
profileService.getAllProfilesCount(),
+ (count) -> count == (450 + originalProfilesCount), 1000, 100);
+
+ // Try purge inactive profiles since 5 days
+ profileService.purgeProfiles(5, 0);
+ keepTrying("Inactive profiles should be purge so we should have 300
profiles now", () -> profileService.getAllProfilesCount(),
+ (count) -> count == (300 + originalProfilesCount), 1000, 100);
+
+ // Try purge inactive profiles since 5 days and/or older than 25 days
+ profileService.purgeProfiles(5, 25);
+ keepTrying("Older profiles should be purge so we should have 150
profiles now", () -> profileService.getAllProfilesCount(),
+ (count) -> count == (150 + originalProfilesCount), 1000, 100);
+ }
+
+ @Test
+ public void testMonthlyIndicesPurge() throws Exception {
+ Date currentDate = new Date();
+ LocalDateTime minus10Months =
LocalDateTime.ofInstant(currentDate.toInstant(),
ZoneId.systemDefault()).minusMonths(10);
+ LocalDateTime minus30Months =
LocalDateTime.ofInstant(currentDate.toInstant(),
ZoneId.systemDefault()).minusMonths(30);
+ Date currentDateMinus10Months =
Date.from(minus10Months.atZone(ZoneId.systemDefault()).toInstant());
+ Date currentDateMinus30Months =
Date.from(minus30Months.atZone(ZoneId.systemDefault()).toInstant());
+
+ long originalSessionsCount =
persistenceService.getAllItemsCount(Session.ITEM_TYPE);
+ long originalEventsCount =
persistenceService.getAllItemsCount(Event.ITEM_TYPE);
+
+ Profile profile = new Profile("dummy-profile-monthly-purge-test");
+ persistenceService.save(profile);
+
+ // create 10 months old items
+ for (int i = 0; i < 150; i++) {
+ Session session = new Session("10months-old-session-" + i,
profile, currentDateMinus10Months, "dummy-scope");
+ persistenceService.save(session);
+ persistenceService.save(new Event("10months-old-event-" + i,
"view", session, profile, "dummy-scope", null, null, currentDateMinus10Months));
+ }
+
+ // create 30 months old items
+ for (int i = 0; i < 150; i++) {
+ Session session = new Session("30months-old-session-" + i,
profile, currentDateMinus30Months, "dummy-scope");
+ persistenceService.save(session);
+ persistenceService.save(new Event("30months-old-event-" + i,
"view", session, profile, "dummy-scope", null, null, currentDateMinus30Months));
+ }
+
+ // create 30 months old items
+ for (int i = 0; i < 150; i++) {
+ Session session = new Session("recent-session-" + i, profile,
currentDate, "dummy-scope");
+ persistenceService.save(session);
+ persistenceService.save(new Event("recent-event-" + i, "view",
session, profile, "dummy-scope", null, null, currentDate));
+ }
+
+ keepTrying("Sessions number should be 450", () ->
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (450 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 450", () ->
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+ // Should have no effect
+ profileService.purgeMonthlyItems(0);
+ keepTrying("Sessions number should be 450", () ->
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (450 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 450", () ->
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+ // Should have no effect there is no monthly items older than 40 months
+ profileService.purgeMonthlyItems(40);
+ keepTrying("Sessions number should be 450", () ->
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (450 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 450", () ->
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (450 + originalEventsCount), 1000, 100);
+
+ // Should purge monthly items older than 25 days
+ profileService.purgeMonthlyItems(25);
+ keepTrying("Sessions number should be 300", () ->
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (300 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 300", () ->
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (300 + originalEventsCount), 1000, 100);
+
+ // Should purge monthly items older than 5 days
+ profileService.purgeMonthlyItems(5);
+ keepTrying("Sessions number should be 150", () ->
persistenceService.getAllItemsCount(Session.ITEM_TYPE),
+ (count) -> count == (150 + originalSessionsCount), 1000, 100);
+ keepTrying("Events number should be 150", () ->
persistenceService.getAllItemsCount(Event.ITEM_TYPE),
+ (count) -> count == (150 + originalEventsCount), 1000, 100);
+ }
}
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 27bdd35b3..63150feec 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.unomi.persistence.elasticsearch;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hazelcast.core.HazelcastInstance;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@@ -49,6 +48,7 @@ import
org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate;
import org.apache.unomi.persistence.spi.aggregate.NumericRangeAggregate;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import
org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
@@ -103,8 +103,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.*;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+import org.elasticsearch.index.reindex.*;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptException;
@@ -161,7 +160,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -215,6 +213,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private Map<String, String> routingByType;
private Integer defaultQueryLimit = 10;
+ private Integer removeByQueryTimeoutInMinutes = 10;
private String itemsMonthlyIndexedOverride = "event,session";
private String bulkProcessorConcurrentRequests = "1";
@@ -235,9 +234,6 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private int aggregateQueryBucketSize = 5000;
private MetricsService metricsService;
- private HazelcastInstance hazelcastInstance;
- private Set<String> itemClassesToCacheSet = new HashSet<>();
- private String itemClassesToCache;
private boolean useBatchingForSave = false;
private boolean useBatchingForUpdate = true;
private String logLevelRestClient = "ERROR";
@@ -382,23 +378,6 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
this.metricsService = metricsService;
}
- public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
- this.hazelcastInstance = hazelcastInstance;
- }
-
- public void setItemClassesToCache(String itemClassesToCache) {
- this.itemClassesToCache = itemClassesToCache;
- if (StringUtils.isNotBlank(itemClassesToCache)) {
- String[] itemClassesToCacheParts = itemClassesToCache.split(",");
- if (itemClassesToCacheParts != null) {
- itemClassesToCacheSet.clear();
- for (String itemClassToCache : itemClassesToCacheParts) {
- itemClassesToCacheSet.add(itemClassToCache.trim());
- }
- }
- }
- }
-
public void setUseBatchingForSave(boolean useBatchingForSave) {
this.useBatchingForSave = useBatchingForSave;
}
@@ -789,16 +768,7 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
try {
String itemType = Item.getItemType(clazz);
String className = clazz.getName();
- if (customItemType == null) {
- T itemFromCache = getFromCache(itemId,
clazz.getName());
- if (itemFromCache != null) {
- return itemFromCache;
- }
- } else {
- T itemFromCache = getFromCache(itemId,
CustomItem.class.getName() + "." + customItemType);
- if (itemFromCache != null) {
- return itemFromCache;
- }
+ if (customItemType != null) {
className = CustomItem.class.getName() + "." +
customItemType;
itemType = customItemType;
}
@@ -828,7 +798,6 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
String sourceAsString =
response.getSourceAsString();
final T value =
ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
setMetadata(value, response.getId(),
response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
- putInCache(itemId, value, className);
return value;
} else {
return null;
@@ -889,7 +858,6 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
className = CustomItem.class.getName() + "." +
itemType;
}
String itemId = item.getItemId();
- putInCache(itemId, item, className);
String index = getIndex(itemType,
itemsMonthlyIndexed.contains(itemType) ? ((TimestampedItem)
item).getTimeStamp() : null);
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(itemId);
@@ -1215,50 +1183,59 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
+ final DeleteByQueryRequest deleteByQueryRequest = new
DeleteByQueryRequest(getIndexNameForQuery(itemType))
+
.setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
+ // Setting slices to auto will let Elasticsearch
choose the number of slices to use.
+ // This setting will use one slice per shard, up
to a certain limit.
+ // The delete request will be more efficient and
faster than no slicing.
+ .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
+ // Elasticsearch takes a snapshot of the index
when you hit delete by query request and uses the _version of the documents to
process the request.
+ // If a document gets updated in the meantime, it
will result in a version conflict error and the delete operation will fail.
+ // So we explicitly set the conflict strategy to
proceed in case of version conflict.
+ .setAbortOnVersionConflict(false)
+ // Remove by Query is mostly used for purge and
cleaning up old data
+ // It's mostly used in jobs/timed tasks so we
don't really care about long request
+ // So we increase default timeout of 1min to 10min
+
.setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
+
+ BulkByScrollResponse bulkByScrollResponse =
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+
+ if (bulkByScrollResponse == null) {
+ logger.error("Remove by query: no response returned
for query: {}", query);
+ return false;
+ }
- BulkRequest deleteByScopeBulkRequest = new BulkRequest();
-
- final TimeValue keepAlive = TimeValue.timeValueHours(1);
- SearchRequest searchRequest = new
SearchRequest(getAllIndexForQuery())
- .indices(getIndexNameForQuery(itemType))
- .scroll(keepAlive);
- SearchSourceBuilder searchSourceBuilder = new
SearchSourceBuilder()
-
.query(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
- .size(100);
- searchRequest.source(searchSourceBuilder);
-
- SearchResponse response = client.search(searchRequest,
RequestOptions.DEFAULT);
+ if (bulkByScrollResponse.isTimedOut()) {
+ logger.warn("Remove by query: timed out because took
more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
+ }
- // Scroll until no more hits are returned
- while (true) {
+ if ((bulkByScrollResponse.getSearchFailures() != null &&
bulkByScrollResponse.getSearchFailures().size() > 0) ||
+ bulkByScrollResponse.getBulkFailures() != null &&
bulkByScrollResponse.getBulkFailures().size() > 0) {
+ logger.warn("Remove by query: we found some failure
during the process of query: {}", query);
- for (SearchHit hit : response.getHits().getHits()) {
- // add hit to bulk delete
- deleteFromCache(hit.getId(), clazz.getName());
-
deleteByScopeBulkRequest.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId()));
+ if (bulkByScrollResponse.getSearchFailures() != null
&& bulkByScrollResponse.getSearchFailures().size() > 0) {
+ for (ScrollableHitSource.SearchFailure
searchFailure : bulkByScrollResponse.getSearchFailures()) {
+ logger.warn("Remove by query, search failure:
{}", searchFailure.toString());
+ }
}
- SearchScrollRequest searchScrollRequest = new
SearchScrollRequest(response.getScrollId());
- searchScrollRequest.scroll(keepAlive);
- response = client.scroll(searchScrollRequest,
RequestOptions.DEFAULT);
-
- // If we have no more hits, exit
- if (response.getHits().getHits().length == 0) {
- break;
+ if (bulkByScrollResponse.getBulkFailures() != null &&
bulkByScrollResponse.getBulkFailures().size() > 0) {
+ for (BulkItemResponse.Failure bulkFailure :
bulkByScrollResponse.getBulkFailures()) {
+ logger.warn("Remove by query, bulk failure:
{}", bulkFailure.toString());
+ }
}
}
- ClearScrollRequest clearScrollRequest = new
ClearScrollRequest();
- clearScrollRequest.addScrollId(response.getScrollId());
- client.clearScroll(clearScrollRequest,
RequestOptions.DEFAULT);
-
- // we're done with the scrolling, delete now
- if (deleteByScopeBulkRequest.numberOfActions() > 0) {
- final BulkResponse deleteResponse =
client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT);
- if (deleteResponse.hasFailures()) {
- // do something
- logger.warn("Couldn't remove by query " + query +
":\n{}", deleteResponse.buildFailureMessage());
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Remove by query: took {}, deleted docs:
{}, batches executed: {}, skipped docs: {}, version conflicts: {}, search
retries: {}, bulk retries: {}, for query: {}",
+
bulkByScrollResponse.getTook().toHumanReadableString(1),
+ bulkByScrollResponse.getDeleted(),
+ bulkByScrollResponse.getBatches(),
+ bulkByScrollResponse.getNoops(),
+ bulkByScrollResponse.getVersionConflicts(),
+ bulkByScrollResponse.getSearchRetries(),
+ bulkByScrollResponse.getBulkRetries(),
+ query);
}
return true;
@@ -2503,40 +2480,6 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
- private <T extends Item> boolean isCacheActiveForClass(String className) {
- if (itemClassesToCacheSet.contains("*")) {
- return true;
- }
- if (itemClassesToCacheSet.contains(className)) {
- return true;
- }
- return false;
- }
-
- private <T extends Item> T getFromCache(String itemId, String className) {
- if (!isCacheActiveForClass(className)) {
- return null;
- }
- Map<String, T> itemCache = hazelcastInstance.getMap(className);
- return itemCache.get(itemId);
- }
-
- private <T extends Item> T putInCache(String itemId, T item, String
className) {
- if (!isCacheActiveForClass(className)) {
- return null;
- }
- Map<String, T> itemCache = hazelcastInstance.getMap(className);
- return itemCache.put(itemId, item);
- }
-
- private <T extends Item> T deleteFromCache(String itemId, String
className) {
- if (!isCacheActiveForClass(className)) {
- return null;
- }
- Map<String, T> itemCache = hazelcastInstance.getMap(className);
- return itemCache.remove(itemId);
- }
-
private String getAllIndexForQuery() {
return indexPrefix + "*";
}
diff --git
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 44e0411e6..e0fdded38 100644
---
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -58,7 +58,6 @@
<cm:property name="aggQueryMaxResponseSizeHttp" value="" />
<cm:property name="aggQueryThrowOnMissingDocs" value="false" />
<cm:property name="itemTypeToRefreshPolicy" value="" />
- <cm:property name="itemClassesToCache" value="" />
<cm:property name="useBatchingForSave" value="false" />
<cm:property name="useBatchingForUpdate" value="true" />
@@ -74,7 +73,6 @@
</cm:property-placeholder>
<reference id="metricsService"
interface="org.apache.unomi.metrics.MetricsService" />
- <reference id="hazelcastInstance"
interface="com.hazelcast.core.HazelcastInstance" />
<reference id="scriptExecutor"
interface="org.apache.unomi.scripting.ScriptExecutor" />
<service id="elasticSearchPersistenceService"
ref="elasticSearchPersistenceServiceImpl">
@@ -137,8 +135,6 @@
<property name="clientSocketTimeout" value="${es.clientSocketTimeout}"
/>
<property name="metricsService" ref="metricsService" />
- <property name="hazelcastInstance" ref="hazelcastInstance" />
- <property name="itemClassesToCache" value="${es.itemClassesToCache}" />
<property name="useBatchingForSave" value="${es.useBatchingForSave}" />
<property name="useBatchingForUpdate"
value="${es.useBatchingForUpdate}" />
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
index a11189ae9..a259c1491 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/profiles/ProfileServiceImpl.java
@@ -198,14 +198,15 @@ public class ProfileServiceImpl implements
ProfileService, SynchronousBundleList
private SegmentService segmentService;
- private Condition purgeProfileQuery;
private Integer purgeProfileExistTime = 0;
private Integer purgeProfileInactiveTime = 0;
private Integer purgeSessionsAndEventsTime = 0;
private Integer purgeProfileInterval = 0;
+ private TimerTask purgeTask = null;
private long propertiesRefreshInterval = 10000;
private PropertyTypes propertyTypes;
+ private TimerTask propertyTypeLoadTask = null;
private boolean forceRefreshOnSave = false;
@@ -258,6 +259,12 @@ public class ProfileServiceImpl implements ProfileService,
SynchronousBundleList
}
public void preDestroy() {
+ if (purgeTask != null) {
+ purgeTask.cancel();
+ }
+ if (propertyTypeLoadTask != null) {
+ propertyTypeLoadTask.cancel();
+ }
bundleContext.removeBundleListener(this);
logger.info("Profile service shutdown.");
}
@@ -290,13 +297,13 @@ public class ProfileServiceImpl implements
ProfileService, SynchronousBundleList
}
private void schedulePropertyTypeLoad() {
- TimerTask task = new TimerTask() {
+ propertyTypeLoadTask = new TimerTask() {
@Override
public void run() {
reloadPropertyTypes(false);
}
};
-
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 10000,
propertiesRefreshInterval, TimeUnit.MILLISECONDS);
+
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(propertyTypeLoadTask,
10000, propertiesRefreshInterval, TimeUnit.MILLISECONDS);
logger.info("Scheduled task for property type loading each 10s");
}
@@ -319,74 +326,90 @@ public class ProfileServiceImpl implements
ProfileService, SynchronousBundleList
}
}
+ @Override
+ public void purgeProfiles(int inactiveNumberOfDays, int
existsNumberOfDays) {
+ if (inactiveNumberOfDays > 0 || existsNumberOfDays > 0) {
+ ConditionType profilePropertyConditionType =
definitionsService.getConditionType("profilePropertyCondition");
+ ConditionType booleanCondition =
definitionsService.getConditionType("booleanCondition");
+ if (profilePropertyConditionType == null || booleanCondition ==
null) {
+ // definition service not yet fully instantiate
+ return;
+ }
+
+ Condition purgeProfileQuery = new Condition(booleanCondition);
+ purgeProfileQuery.setParameter("operator", "or");
+ List<Condition> subConditions = new ArrayList<>();
+
+ if (inactiveNumberOfDays > 0) {
+ logger.info("Purging: Profile with no visits since {} days",
inactiveNumberOfDays);
+ Condition inactiveTimeCondition = new
Condition(profilePropertyConditionType);
+ inactiveTimeCondition.setParameter("propertyName",
"properties.lastVisit");
+ inactiveTimeCondition.setParameter("comparisonOperator",
"lessThanOrEqualTo");
+ inactiveTimeCondition.setParameter("propertyValueDateExpr",
"now-" + inactiveNumberOfDays + "d");
+ subConditions.add(inactiveTimeCondition);
+ }
+
+ if (existsNumberOfDays > 0) {
+ Condition existTimeCondition = new
Condition(profilePropertyConditionType);
+ logger.info("Purging: Profile created since more than {}
days", existsNumberOfDays);
+ existTimeCondition.setParameter("propertyName",
"properties.firstVisit");
+ existTimeCondition.setParameter("comparisonOperator",
"lessThanOrEqualTo");
+ existTimeCondition.setParameter("propertyValueDateExpr",
"now-" + existsNumberOfDays + "d");
+ subConditions.add(existTimeCondition);
+ }
+
+ purgeProfileQuery.setParameter("subConditions", subConditions);
+ persistenceService.removeByQuery(purgeProfileQuery, Profile.class);
+ }
+ }
+
+ @Override
+ public void purgeMonthlyItems(int existsNumberOfMonths) {
+ if (existsNumberOfMonths > 0) {
+ logger.info("Purging: Monthly items (sessions/events) created
before {} months", existsNumberOfMonths);
+
persistenceService.purge(getMonth(-existsNumberOfMonths).getTime());
+ }
+ }
+
private void initializePurge() {
- logger.info("Profile purge: Initializing");
+ logger.info("Purge: Initializing");
if (purgeProfileInactiveTime > 0 || purgeProfileExistTime > 0 ||
purgeSessionsAndEventsTime > 0) {
if (purgeProfileInactiveTime > 0) {
- logger.info("Profile purge: Profile with no visits since {}
days, will be purged", purgeProfileInactiveTime);
+ logger.info("Purge: Profile with no visits since more than {}
days, will be purged", purgeProfileInactiveTime);
}
if (purgeProfileExistTime > 0) {
- logger.info("Profile purge: Profile created since {} days,
will be purged", purgeProfileExistTime);
+ logger.info("Purge: Profile created since more than {} days,
will be purged", purgeProfileExistTime);
+ }
+ if (purgeSessionsAndEventsTime > 0) {
+ logger.info("Purge: Monthly items (sessions/events) created
since more than {} months, will be purged", purgeSessionsAndEventsTime);
}
- TimerTask task = new TimerTask() {
+ purgeTask = new TimerTask() {
@Override
public void run() {
try {
long purgeStartTime = System.currentTimeMillis();
- logger.debug("Profile purge: Purge triggered");
-
- if (purgeProfileQuery == null) {
- ConditionType profilePropertyConditionType =
definitionsService.getConditionType("profilePropertyCondition");
- ConditionType booleanCondition =
definitionsService.getConditionType("booleanCondition");
- if (profilePropertyConditionType == null ||
booleanCondition == null) {
- // definition service not yet fully instantiate
- return;
- }
+ logger.info("Purge: triggered");
- purgeProfileQuery = new
Condition(booleanCondition);
- purgeProfileQuery.setParameter("operator", "or");
- List<Condition> subConditions = new ArrayList<>();
+ // Profile purge
+ purgeProfiles(purgeProfileInactiveTime,
purgeProfileExistTime);
- if (purgeProfileInactiveTime > 0) {
- Condition inactiveTimeCondition = new
Condition(profilePropertyConditionType);
-
inactiveTimeCondition.setParameter("propertyName", "properties.lastVisit");
-
inactiveTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
-
inactiveTimeCondition.setParameter("propertyValueDateExpr", "now-" +
purgeProfileInactiveTime + "d");
- subConditions.add(inactiveTimeCondition);
- }
-
- if (purgeProfileExistTime > 0) {
- Condition existTimeCondition = new
Condition(profilePropertyConditionType);
-
existTimeCondition.setParameter("propertyName", "properties.firstVisit");
-
existTimeCondition.setParameter("comparisonOperator", "lessThanOrEqualTo");
-
existTimeCondition.setParameter("propertyValueDateExpr", "now-" +
purgeProfileExistTime + "d");
- subConditions.add(existTimeCondition);
- }
+ // Monthly items purge
+ purgeMonthlyItems(purgeSessionsAndEventsTime);
- purgeProfileQuery.setParameter("subConditions",
subConditions);
- }
-
- persistenceService.removeByQuery(purgeProfileQuery,
Profile.class);
-
- if (purgeSessionsAndEventsTime > 0) {
- logger.info("Monthly indexes purge: Session and
events created before {} months, will be purged",
- purgeSessionsAndEventsTime);
-
persistenceService.purge(getMonth(-purgeSessionsAndEventsTime).getTime());
- }
-
- logger.info("Profile purge: purge executed in {} ms",
System.currentTimeMillis() - purgeStartTime);
+ logger.info("Purge: executed in {} ms",
System.currentTimeMillis() - purgeStartTime);
} catch (Throwable t) {
- logger.error("Error while purging profiles", t);
+ logger.error("Error while purging", t);
}
}
};
-
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 1,
purgeProfileInterval, TimeUnit.DAYS);
- logger.info("Profile purge: purge scheduled with an interval of {}
days", purgeProfileInterval);
+
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(purgeTask, 1,
purgeProfileInterval, TimeUnit.DAYS);
+
+ logger.info("Purge: purge scheduled with an interval of {} days",
purgeProfileInterval);
} else {
- logger.info("Profile purge: No purge scheduled");
+ logger.info("Purge: No purge scheduled");
}
}