This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch UNOMI-462-past-events in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 077384a29e0f2f811a95e9b684d0d3bde33301e0 Author: Kevan <[email protected]> AuthorDate: Wed Sep 15 11:45:50 2021 +0200 Fix recalculation of Segments containing past event conditions, also provide ITest --- .../apache/unomi/api/services/SegmentService.java | 11 ++ itests/pom.xml | 1 + .../java/org/apache/unomi/itests/SegmentIT.java | 64 +++++++++++ .../apache/unomi/itests/tools/RetriableHelper.java | 66 +++++++++++ .../PastEventConditionESQueryBuilder.java | 25 ++-- .../services/impl/segments/SegmentServiceImpl.java | 126 +++++++++++++++++---- 6 files changed, 254 insertions(+), 39 deletions(-) diff --git a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java index d1dc440..a54489b 100644 --- a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java +++ b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java @@ -226,4 +226,15 @@ public interface SegmentService { * @return a String representing the condition and parent condition uniquelly */ String getGeneratedPropertyKey(Condition condition, Condition parentCondition); + + /** + * This will recalculate the past event conditions from existing rules + * This operation can be heavy and take time, it will: + * - browse existing rules to extract the past event condition, + * - query the matching events for those conditions, + * - update the corresponding profiles + * - reevaluate segments linked to this rules to engaged/disengaged profiles after the occurrences have been updated + * So use it carefully or execute this method in a dedicated thread. + */ + void recalculatePastEventConditions(); } diff --git a/itests/pom.xml b/itests/pom.xml index c022536..e3bec40 100644 --- a/itests/pom.xml +++ b/itests/pom.xml @@ -183,6 +183,7 @@ <instanceSettings> <properties> <cluster.routing.allocation.disk.threshold_enabled>false</cluster.routing.allocation.disk.threshold_enabled> + <http.cors.allow-origin>*</http.cors.allow-origin> </properties> </instanceSettings> </configuration> diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java index 207bd93..87ca3fd 100644 --- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java @@ -26,6 +26,7 @@ import org.apache.unomi.api.services.EventService; import org.apache.unomi.api.services.ProfileService; import org.apache.unomi.api.services.SegmentService; import org.apache.unomi.api.exceptions.BadSegmentConditionException; +import org.apache.unomi.itests.tools.RetriableHelper; import org.apache.unomi.persistence.spi.PersistenceService; import org.junit.After; import org.junit.Assert; @@ -44,6 +45,7 @@ import java.time.LocalDate; import java.time.ZoneId; import java.util.Date; import java.util.List; +import java.util.concurrent.Callable; @RunWith(PaxExam.class) @ExamReactorStrategy(PerSuite.class) @@ -180,4 +182,66 @@ public class SegmentIT extends BaseIT { profile = profileService.load("test_profile_id"); Assert.assertTrue("Profile should be engaged in the segment", profile.getSegments().contains("past-event-segment-test")); } + + @Test + public void testSegmentPastEventRecalculation() throws Exception { + // create Profile + Profile profile = new Profile(); + profile.setItemId("test_profile_id"); + profileService.save(profile); + persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index + + // create the segment + Metadata segmentMetadata = new Metadata("past-event-segment-test"); + Segment segment = new Segment(segmentMetadata); + Condition segmentCondition = new Condition(definitionsService.getConditionType("pastEventCondition")); + segmentCondition.setParameter("numberOfDays", 10); + Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition")); + pastEventEventCondition.setParameter("eventTypeId", "test-event-type"); + segmentCondition.setParameter("eventCondition", pastEventEventCondition); + segment.setCondition(segmentCondition); + segmentService.setSegmentDefinition(segment); + Thread.sleep(5000); + + // Persist the event (do not send it into the system so that it will not be processed by the rules) + ZoneId defaultZoneId = ZoneId.systemDefault(); + LocalDate localDate = LocalDate.now().minusDays(3); + Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant())); + testEvent.setPersistent(true); + persistenceService.save(testEvent, null, true); + persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed + + // insure the profile is not yet engaged since we directly saved the event in ES + profile = profileService.load("test_profile_id"); + Assert.assertFalse("Profile should not be engaged in the segment", profile.getSegments().contains("past-event-segment-test")); + + // now recalculate the past event conditions + segmentService.recalculatePastEventConditions(); + persistenceService.refreshIndex(Profile.class, null); + new RetriableHelper<>("testSegmentPastEventRecalculation profile engaged", 20, 1000, () -> { + Profile updatedProfile = profileService.load("test_profile_id"); + if (!updatedProfile.getSegments().contains("past-event-segment-test")) { + throw new RuntimeException("Profile should be engaged in the segment, will retry or fail if retry reach the limit"); + } + return updatedProfile; + }).call(); + + // update the event to a date out of the past event condition + removeItems(Event.class); + localDate = LocalDate.now().minusDays(15); + testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant())); + persistenceService.save(testEvent); + persistenceService.refreshIndex(Event.class, new Date()); // wait for event to be fully persisted and indexed + + // now recalculate the past event conditions + segmentService.recalculatePastEventConditions(); + persistenceService.refreshIndex(Profile.class, null); + new RetriableHelper<>("testSegmentPastEventRecalculation profile not engaged anymore", 20, 1000, () -> { + Profile updatedProfile = profileService.load("test_profile_id"); + if (updatedProfile.getSegments().contains("past-event-segment-test")) { + throw new RuntimeException("Profile should not be engaged in the segment anymore, will retry or fail if retry reach the limit"); + } + return updatedProfile; + }).call(); + } } diff --git a/itests/src/test/java/org/apache/unomi/itests/tools/RetriableHelper.java b/itests/src/test/java/org/apache/unomi/itests/tools/RetriableHelper.java new file mode 100644 index 0000000..540a7db --- /dev/null +++ b/itests/src/test/java/org/apache/unomi/itests/tools/RetriableHelper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.unomi.itests.tools; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; + +/** + * Just an utility class to do some retriable stuff in ITests + * Useful when you are waiting for something to be indexed in ES for exemple, you just retry until your object is available + * @param <T> The type of object you are expecting to return from this retry instance + */ +public class RetriableHelper<T> implements Callable<T> { + + private final static Logger LOGGER = LoggerFactory.getLogger(RetriableHelper.class); + + private final Callable<T> task; + private final long timeToWait; + private final String key; + + private int numberOfTriesLeft; + + + public RetriableHelper(String key, int numberOfRetries, long timeToWait, Callable<T> task) { + this.key = key; + this.numberOfTriesLeft = numberOfRetries; + this.timeToWait = timeToWait; + this.task = task; + } + + public T call() throws Exception { + while (true) { + try { + return task.call(); + } catch (InterruptedException | CancellationException e) { + throw e; + } catch (Exception e) { + numberOfTriesLeft--; + LOGGER.warn("RETRY: {} failed, number of tries left: {}, will wait {}ms before next try", key, numberOfTriesLeft, timeToWait); + if (numberOfTriesLeft == 0) { + Assert.fail("RETRY LIMIT REACH: " + e.getMessage()); + } + Thread.sleep(timeToWait); + } + } + } +} \ No newline at end of file diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java index 3c676d4..ca2ee7e 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java @@ -77,23 +77,14 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) { Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount"); Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount"); - - if (condition.getParameter("generatedPropertyKey") != null && condition.getParameter("generatedPropertyKey").equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) { - // A property is already set on profiles matching the past event condition, use it - if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) { - // Check the number of occurences - RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey")); - if (minimumEventCount != 1) { - builder.gte(minimumEventCount); - } - if (maximumEventCount != Integer.MAX_VALUE) { - builder.lte(minimumEventCount); - } - return builder; - } else { - // Simply get profiles who have the property set - return QueryBuilders.existsQuery("systemProperties.pastEvents." + condition.getParameter("generatedPropertyKey")); - } + String generatedPropertyKey = (String) condition.getParameter("generatedPropertyKey"); + + if (generatedPropertyKey != null && generatedPropertyKey.equals(segmentService.getGeneratedPropertyKey((Condition) condition.getParameter("eventCondition"), condition))) { + // A property is already set on profiles matching the past event condition, use it to check the numbers of occurrences + RangeQueryBuilder builder = QueryBuilders.rangeQuery("systemProperties.pastEvents." + generatedPropertyKey); + builder.gte(minimumEventCount); + builder.lte(minimumEventCount); + return builder; } else { // No property set - tries to build an idsQuery // Build past event condition diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index d096668..d219776 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -783,7 +783,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe rule.setLinkedItems(Arrays.asList(metadata.getId())); rules.add(rule); - updateExistingProfilesForPastEventCondition(condition, parentCondition, true); + // it's a new generated rules to keep track of the event count, we should update all the profile that match this past event + // it will update the count of event occurrence on the profile directly + recalculatePastEventOccurrencesOnProfiles(condition, parentCondition, true, false); } else { rule.getLinkedItems().add(metadata.getId()); rules.add(rule); @@ -805,7 +807,18 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } } - private void updateExistingProfilesForPastEventCondition(Condition eventCondition, Condition parentCondition, boolean forceRefresh) { + /** + * This will recalculate the event counts on the profiles that match the given past event condition + * @param eventCondition the real condition + * @param parentCondition the past event condition + * @param forceRefresh will refresh the Profile index in case it's true + * @param resetExistingProfilesNotMatching if true, will reset existing profiles having a count to 0, in case they do not have events matching anymore + * ("false" can be useful when you know that no existing profiles already exist because it's a new rule for example, + * in that case setting this to "false" allow to skip profiles queries and speedup this process. + * Otherwise use "true" here to be sure the count is reset to 0 on profiles that need to be reset) + */ + private void recalculatePastEventOccurrencesOnProfiles(Condition eventCondition, Condition parentCondition, + boolean forceRefresh, boolean resetExistingProfilesNotMatching) { long t = System.currentTimeMillis(); List<Condition> l = new ArrayList<Condition>(); Condition andCondition = new Condition(); @@ -845,21 +858,33 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey"); + Set<String> existingProfilesWithCounts = resetExistingProfilesNotMatching ? getExistingProfilesWithPastEventOccurrenceCount(propertyKey) : Collections.emptySet(); int updatedProfileCount = 0; if(pastEventsDisablePartitions) { Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount); - updatedProfileCount = updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey); + Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey); + existingProfilesWithCounts.removeAll(updatedProfiles); + updatedProfileCount = updatedProfiles.size(); } else { Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); long card = m.get("_card").longValue(); int numParts = (int) (card / aggregateQueryBucketSize) + 2; for (int i = 0; i < numParts; i++) { Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); - updatedProfileCount += updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey); + Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey); + existingProfilesWithCounts.removeAll(updatedProfiles); + updatedProfileCount += updatedProfiles.size(); } } + // remaining existing profiles with counts should be reset to 0 since they have not been updated it means + // that they do not have matching events anymore in the time based condition + if (!existingProfilesWithCounts.isEmpty()) { + updatedProfileCount += updatePastEventOccurrencesOnProfiles( + existingProfilesWithCounts.stream().collect(Collectors.toMap(key -> key, value -> 0L)), propertyKey).size(); + } + if (forceRefresh && updatedProfileCount > 0) { persistenceService.refreshIndex(Profile.class, null); } @@ -867,6 +892,34 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe logger.info("{} profiles updated for past event condition in {}ms", updatedProfileCount, System.currentTimeMillis() - t); } + /** + * Return the list of profile ids, for profiles that already have an event count matching the generated property key + * @param generatedPropertyKey the generated property key of the generated rule for the given past event condition. + * @return the list of profile ids. + */ + private Set<String> getExistingProfilesWithPastEventOccurrenceCount(String generatedPropertyKey) { + Condition countExistsCondition = new Condition(); + countExistsCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition")); + countExistsCondition.setParameter("propertyName", "systemProperties.pastEvents." + generatedPropertyKey); + countExistsCondition.setParameter("comparisonOperator", "greaterThan"); + countExistsCondition.setParameter("propertyValueInteger", 0); + + Set<String> profileIds = new HashSet<>(); + if(pastEventsDisablePartitions) { + profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("profileId"), + Event.ITEM_TYPE, maximumIdsQueryCount).keySet()); + } else { + Map<String, Double> m = persistenceService.getSingleValuesMetrics(countExistsCondition, new String[]{"card"}, "itemId.keyword", Profile.ITEM_TYPE); + long card = m.get("_card").longValue(); + int numParts = (int) (card / aggregateQueryBucketSize) + 2; + for (int i = 0; i < numParts; i++) { + profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("itemId", i, numParts), + Profile.ITEM_TYPE).keySet()); + } + } + return profileIds; + } + public String getGeneratedPropertyKey(Condition condition, Condition parentCondition) { try { Map<String, Object> m = new HashMap<>(); @@ -890,8 +943,50 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } } - private int updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) { - int profileUpdatedCount = 0; + @Override + public void recalculatePastEventConditions() { + logger.info("running scheduled task to recalculate segments with pastEventCondition conditions"); + long pastEventsTaskStartTime = System.currentTimeMillis(); + Set<String> linkedSegments = new HashSet<>(); + for (Metadata metadata : rulesService.getRuleMetadatas()) { + // reevaluate auto generated rules used to store the event occurrence count on the profile + Rule rule = rulesService.getRule(metadata.getId()); + for (Action action : rule.getActions()) { + if (action.getActionTypeId().equals("setEventOccurenceCountAction")) { + Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition"); + if (pastEventCondition.containsParameter("numberOfDays")) { + recalculatePastEventOccurrencesOnProfiles(rule.getCondition(), pastEventCondition, false, true); + logger.info("Event occurrence count on profiles updated for rule: {}", rule.getItemId()); + if (rule.getLinkedItems() != null && rule.getLinkedItems().size() > 0) { + linkedSegments.addAll(rule.getLinkedItems()); + } + } + } + } + } + + // reevaluate segments linked to this rule, since we have updated the event occurrences count on the profiles. + if (linkedSegments.size() > 0) { + persistenceService.refreshIndex(Profile.class, null); + for (String linkedItem : linkedSegments) { + Segment linkedSegment = getSegmentDefinition(linkedItem); + if (linkedSegment != null) { + updateExistingProfilesForSegment(linkedSegment); + } + } + } + + logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime); + } + + /** + * This will update all the profiles in the given map with the according new count occurrence for the given propertyKey + * @param eventCountByProfile the events count per profileId map + * @param propertyKey the generate property key for this past event condition, to keep track of the count in the profile + * @return the list of profiles for witch the count of event occurrences have been updated. + */ + private Set<String> updatePastEventOccurrencesOnProfiles(Map<String, Long> eventCountByProfile, String propertyKey) { + Set<String> profilesUpdated = new HashSet<>(); Map<Item, Map> batch = new HashMap<>(); Iterator<Map.Entry<String, Long>> entryIterator = eventCountByProfile.entrySet().iterator(); while (entryIterator.hasNext()){ @@ -907,12 +1002,12 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe Profile profile = new Profile(); profile.setItemId(profileId); batch.put(profile, Collections.singletonMap("systemProperties", systemProperties)); + profilesUpdated.add(profileId); } if (batch.size() == segmentUpdateBatchSize || (!entryIterator.hasNext() && batch.size() > 0)) { try { persistenceService.update(batch, null, Profile.class); - profileUpdatedCount += batch.size(); } catch (Exception e) { logger.error("Error updating {} profiles for past event system properties", batch.size(), e); } finally { @@ -920,7 +1015,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } } } - return profileUpdatedCount; + return profilesUpdated; } private String getMD5(String md5) { @@ -1134,20 +1229,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe @Override public void run() { try { - logger.info("running scheduled task to recalculate segments with pastEventCondition conditions"); - long pastEventsTaskStartTime = System.currentTimeMillis(); - for (Metadata metadata : rulesService.getRuleMetadatas()) { - Rule rule = rulesService.getRule(metadata.getId()); - for (Action action : rule.getActions()) { - if (action.getActionTypeId().equals("setEventOccurenceCountAction")) { - Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition"); - if (pastEventCondition.containsParameter("numberOfDays")) { - updateExistingProfilesForPastEventCondition(rule.getCondition(), pastEventCondition, false); - } - } - } - } - logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime); + recalculatePastEventConditions(); } catch (Throwable t) { logger.error("Error while updating profiles for past event conditions", t); }
