This is an automated email from the ASF dual-hosted git repository. dgriffon pushed a commit to branch fix-duplicate-linkedItems in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 745376e7b5fa4289658886e1d239fe6695a4b364 Author: David Griffon <[email protected]> AuthorDate: Thu Nov 18 10:50:18 2021 +0100 UNOMI-527 : fix duplicated linkedItems --- .../main/java/org/apache/unomi/api/rules/Rule.java | 3 +- .../java/org/apache/unomi/itests/SegmentIT.java | 49 ++++++++++ .../services/impl/segments/SegmentServiceImpl.java | 104 ++++++++++----------- 3 files changed, 100 insertions(+), 56 deletions(-) diff --git a/api/src/main/java/org/apache/unomi/api/rules/Rule.java b/api/src/main/java/org/apache/unomi/api/rules/Rule.java index d3a44cb..0aedfb1 100644 --- a/api/src/main/java/org/apache/unomi/api/rules/Rule.java +++ b/api/src/main/java/org/apache/unomi/api/rules/Rule.java @@ -22,6 +22,7 @@ import org.apache.unomi.api.actions.Action; import org.apache.unomi.api.conditions.Condition; import java.util.List; +import java.util.stream.Collectors; /** * A conditional set of actions to be executed in response to incoming events. Triggering of rules is guarded by a condition: the rule is only triggered if the associated @@ -122,7 +123,7 @@ public class Rule extends MetadataItem { * @param linkedItems the linked items */ public void setLinkedItems(List<String> linkedItems) { - this.linkedItems = linkedItems; + this.linkedItems = linkedItems == null ? null : linkedItems.stream().distinct().collect(Collectors.toList()); } /** diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java index bbb0354..33e0f11 100644 --- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java @@ -21,6 +21,7 @@ import org.apache.unomi.api.Event; import org.apache.unomi.api.Metadata; import org.apache.unomi.api.Profile; import org.apache.unomi.api.conditions.Condition; +import org.apache.unomi.api.rules.Rule; import org.apache.unomi.api.segments.Scoring; import org.apache.unomi.api.segments.ScoringElement; import org.apache.unomi.api.segments.Segment; @@ -358,4 +359,52 @@ public class SegmentIT extends BaseIT { updatedProfile -> !updatedProfile.getScores().containsKey("past-event-scoring-test"), 1000, 20); } + + @Test + public void testLinkedItems() throws Exception { + + // create the past event condition + Condition pastEventCondition = new Condition(definitionsService.getConditionType("pastEventCondition")); + pastEventCondition.setParameter("minimumEventCount", 1); + pastEventCondition.setParameter("maximumEventCount", 2); + + pastEventCondition.setParameter("fromDate", "2000-07-15T07:00:00Z"); + pastEventCondition.setParameter("toDate", "2001-01-15T07:00:00Z"); + ; + Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition")); + pastEventEventCondition.setParameter("eventTypeId", "test-event-type"); + pastEventCondition.setParameter("eventCondition", pastEventEventCondition); + + // create the scoring + Metadata scoringMetadata = new Metadata("past-event-scoring-test"); + Scoring scoring = new Scoring(scoringMetadata); + List<ScoringElement> scoringElements = new ArrayList<>(); + ScoringElement scoringElement = new ScoringElement(); + scoringElement.setCondition(pastEventCondition); + scoringElement.setValue(50); + scoringElements.add(scoringElement); + scoring.setElements(scoringElements); + segmentService.setScoringDefinition(scoring); + Thread.sleep(5000); + // Check linkedItems + List<Rule> rules = persistenceService.getAllItems(Rule.class); + Rule scoringRule = rules.stream().filter(rule -> rule.getItemId().equals(pastEventCondition.getParameter("generatedPropertyKey"))).findFirst().get(); + Assert.assertEquals("Scoring linked Item should be one", 1, scoringRule.getLinkedItems().size()); + + // save the scoring once again + segmentService.setScoringDefinition(scoring); + Thread.sleep(5000); + // Check linkedItems + rules = persistenceService.getAllItems(Rule.class); + scoringRule = rules.stream().filter(rule -> rule.getItemId().equals(pastEventCondition.getParameter("generatedPropertyKey"))).findFirst().get(); + Assert.assertEquals("Scoring linked Item should be one", 1, scoringRule.getLinkedItems().size()); + + // Remove scoring + segmentService.removeSegmentDefinition(scoring.getItemId(), true); + Thread.sleep(5000); + // Check linkedItems + rules = persistenceService.getAllItems(Rule.class); + boolean isRule = rules.stream().anyMatch(rule -> rule.getItemId().equals(pastEventCondition.getParameter("generatedPropertyKey"))); + Assert.assertFalse("Rule is properly removed", isRule); + } } \ No newline at end of file diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index b4bd9e3..1742328 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -143,7 +143,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe this.batchSegmentProfileUpdate = batchSegmentProfileUpdate; } - public void setSendProfileUpdateEventForSegmentUpdate(boolean sendProfileUpdateEventForSegmentUpdate){ + public void setSendProfileUpdateEventForSegmentUpdate(boolean sendProfileUpdateEventForSegmentUpdate) { this.sendProfileUpdateEventForSegmentUpdate = sendProfileUpdateEventForSegmentUpdate; } @@ -285,8 +285,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe private boolean checkSegmentDeletionImpact(Condition condition, String segmentToDeleteId) { if (condition != null) { - @SuppressWarnings("unchecked") - final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); + @SuppressWarnings("unchecked") final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); if (subConditions != null) { for (Condition subCondition : subConditions) { if (checkSegmentDeletionImpact(subCondition, segmentToDeleteId)) { @@ -294,8 +293,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } } } else if ("profileSegmentCondition".equals(condition.getConditionTypeId())) { - @SuppressWarnings("unchecked") - final List<String> referencedSegmentIds = (List<String>) condition.getParameter("segments"); + @SuppressWarnings("unchecked") final List<String> referencedSegmentIds = (List<String>) condition.getParameter("segments"); if (referencedSegmentIds.indexOf(segmentToDeleteId) >= 0) { return true; @@ -316,8 +314,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe */ private Condition updateSegmentDependentCondition(Condition condition, String segmentId) { if ("booleanCondition".equals(condition.getConditionTypeId())) { - @SuppressWarnings("unchecked") - final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); + @SuppressWarnings("unchecked") final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); List<Condition> updatedSubConditions = new LinkedList<>(); for (Condition subCondition : subConditions) { Condition updatedCondition = updateSegmentDependentCondition(subCondition, segmentId); @@ -336,8 +333,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe return null; } } else if ("profileSegmentCondition".equals(condition.getConditionTypeId())) { - @SuppressWarnings("unchecked") - final List<String> referencedSegmentIds = (List<String>) condition.getParameter("segments"); + @SuppressWarnings("unchecked") final List<String> referencedSegmentIds = (List<String>) condition.getParameter("segments"); if (referencedSegmentIds.indexOf(segmentId) >= 0) { referencedSegmentIds.remove(segmentId); if (referencedSegmentIds.isEmpty()) { @@ -401,8 +397,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe long profileRemovalStartTime = System.currentTimeMillis(); if (batchSegmentProfileUpdate && previousProfiles.size() > 0) { batchUpdateProfilesSegment(segmentId, previousProfiles, false); - } - else { + } else { for (Profile profileToRemove : previousProfiles) { Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToRemove, segmentId, false); persistenceService.update(profileToRemove, null, Profile.class, sourceMap); @@ -577,17 +572,17 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe persistenceService.save(scoring); persistenceService.createMapping(Profile.ITEM_TYPE, String.format( - "{\n" + - " \"properties\": {\n" + - " \"scores\": {\n" + - " \"properties\": {\n" + - " \"%s\": {\n" + - " \"type\":\"long\"\n" + - " }\n" + - " }\n" + - " }\n" + - " }\n" + - "}", scoring.getItemId())); + "{\n" + + " \"properties\": {\n" + + " \"scores\": {\n" + + " \"properties\": {\n" + + " \"%s\": {\n" + + " \"type\":\"long\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}", scoring.getItemId())); updateExistingProfilesForScoring(scoring.getItemId(), scoring.getElements(), scoring.getMetadata().isEnabled()); } @@ -606,8 +601,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe private boolean checkScoringDeletionImpact(Condition condition, String scoringToDeleteId) { if (condition != null) { - @SuppressWarnings("unchecked") - final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); + @SuppressWarnings("unchecked") final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); if (subConditions != null) { for (Condition subCondition : subConditions) { if (checkScoringDeletionImpact(subCondition, scoringToDeleteId)) { @@ -634,8 +628,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe */ private Condition updateScoringDependentCondition(Condition condition, String scoringId) { if ("booleanCondition".equals(condition.getConditionTypeId())) { - @SuppressWarnings("unchecked") - final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); + @SuppressWarnings("unchecked") final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); List<Condition> updatedSubConditions = new LinkedList<>(); for (Condition subCondition : subConditions) { Condition updatedCondition = updateScoringDependentCondition(subCondition, scoringId); @@ -762,7 +755,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe private void clearAutoGeneratedRules(List<Rule> rules, String idWithScope) { for (Rule previousRule : rules) { - previousRule.getLinkedItems().remove(idWithScope); + previousRule.getLinkedItems().removeAll(Collections.singleton(idWithScope)); if (previousRule.getLinkedItems().isEmpty()) { // todo remove profile properties ? persistenceService.remove(previousRule.getItemId(), Rule.class); @@ -789,15 +782,14 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe rule.setActions(Arrays.asList(action)); rule.setLinkedItems(Arrays.asList(metadata.getId())); - rules.add(rule); // it's a new generated rules to keep track of the event count, we should update all the profile that match this past event // it will update the count of event occurrence on the profile directly recalculatePastEventOccurrencesOnProfiles(condition, parentCondition, true, false); - } else { + } else if (!rule.getLinkedItems().contains(metadata.getId())) { rule.getLinkedItems().add(metadata.getId()); - rules.add(rule); } + rules.add(rule); } } else { Collection<Object> values = new ArrayList<>(condition.getParameterValues().values()); @@ -817,16 +809,17 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe /** * This will recalculate the event counts on the profiles that match the given past event condition - * @param eventCondition the real condition - * @param parentCondition the past event condition - * @param forceRefresh will refresh the Profile index in case it's true + * + * @param eventCondition the real condition + * @param parentCondition the past event condition + * @param forceRefresh will refresh the Profile index in case it's true * @param resetExistingProfilesNotMatching if true, will reset existing profiles having a count to 0, in case they do not have events matching anymore * ("false" can be useful when you know that no existing profiles already exist because it's a new rule for example, * in that case setting this to "false" allow to skip profiles queries and speedup this process. * Otherwise use "true" here to be sure the count is reset to 0 on profiles that need to be reset) */ private void recalculatePastEventOccurrencesOnProfiles(Condition eventCondition, Condition parentCondition, - boolean forceRefresh, boolean resetExistingProfilesNotMatching) { + boolean forceRefresh, boolean resetExistingProfilesNotMatching) { long t = System.currentTimeMillis(); List<Condition> l = new ArrayList<Condition>(); Condition andCondition = new Condition(); @@ -848,7 +841,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe numberOfDaysCondition.setParameter("propertyValue", "now-" + numberOfDays + "d"); l.add(numberOfDaysCondition); } - if (fromDate != null) { + if (fromDate != null) { Condition startDateCondition = new Condition(); startDateCondition.setConditionType(definitionsService.getConditionType("sessionPropertyCondition")); startDateCondition.setParameter("propertyName", "timeStamp"); @@ -856,7 +849,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe startDateCondition.setParameter("propertyValueDate", fromDate); l.add(startDateCondition); } - if (toDate != null) { + if (toDate != null) { Condition endDateCondition = new Condition(); endDateCondition.setConditionType(definitionsService.getConditionType("sessionPropertyCondition")); endDateCondition.setParameter("propertyName", "timeStamp"); @@ -869,9 +862,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe Set<String> existingProfilesWithCounts = resetExistingProfilesNotMatching ? getExistingProfilesWithPastEventOccurrenceCount(propertyKey) : Collections.emptySet(); int updatedProfileCount = 0; - if(pastEventsDisablePartitions) { + if (pastEventsDisablePartitions) { Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount); - Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey); + Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey); existingProfilesWithCounts.removeAll(updatedProfiles); updatedProfileCount = updatedProfiles.size(); } else { @@ -880,7 +873,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe int numParts = (int) (card / aggregateQueryBucketSize) + 2; for (int i = 0; i < numParts; i++) { Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); - Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey); + Set<String> updatedProfiles = updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey); existingProfilesWithCounts.removeAll(updatedProfiles); updatedProfileCount += updatedProfiles.size(); } @@ -902,6 +895,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe /** * Return the list of profile ids, for profiles that already have an event count matching the generated property key + * * @param generatedPropertyKey the generated property key of the generated rule for the given past event condition. * @return the list of profile ids. */ @@ -913,7 +907,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe countExistsCondition.setParameter("propertyValueInteger", 0); Set<String> profileIds = new HashSet<>(); - if(pastEventsDisablePartitions) { + if (pastEventsDisablePartitions) { profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition, new TermsAggregate("itemId"), Profile.ITEM_TYPE, maximumIdsQueryCount).keySet()); } else { @@ -946,7 +940,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe String key = CustomObjectMapper.getObjectMapper().writeValueAsString(m); return "eventTriggered" + getMD5(key); } catch (JsonProcessingException e) { - logger.error("Cannot generate key",e); + logger.error("Cannot generate key", e); return null; } } @@ -995,15 +989,16 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe /** * This will update all the profiles in the given map with the according new count occurrence for the given propertyKey + * * @param eventCountByProfile the events count per profileId map - * @param propertyKey the generate property key for this past event condition, to keep track of the count in the profile + * @param propertyKey the generate property key for this past event condition, to keep track of the count in the profile * @return the list of profiles for witch the count of event occurrences have been updated. */ private Set<String> updatePastEventOccurrencesOnProfiles(Map<String, Long> eventCountByProfile, String propertyKey) { Set<String> profilesUpdated = new HashSet<>(); Map<Item, Map> batch = new HashMap<>(); Iterator<Map.Entry<String, Long>> entryIterator = eventCountByProfile.entrySet().iterator(); - while (entryIterator.hasNext()){ + while (entryIterator.hasNext()) { Map.Entry<String, Long> entry = entryIterator.next(); String profileId = entry.getKey(); if (!profileId.startsWith("_")) { @@ -1088,22 +1083,21 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - updateProfilesForSegmentStartTime); } - private long updateProfilesSegment(Condition profilesToUpdateCondition, String segmentId, boolean isAdd){ - long updatedProfileCount= 0; + private long updateProfilesSegment(Condition profilesToUpdateCondition, String segmentId, boolean isAdd) { + long updatedProfileCount = 0; PartialList<Profile> profiles = persistenceService.query(profilesToUpdateCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m"); while (profiles != null && profiles.getList().size() > 0) { long startTime = System.currentTimeMillis(); if (batchSegmentProfileUpdate) { batchUpdateProfilesSegment(segmentId, profiles.getList(), isAdd); - } - else { //send update profile one by one + } else { //send update profile one by one for (Profile profileToUpdate : profiles.getList()) { Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd); persistenceService.update(profileToUpdate, null, Profile.class, sourceMap); } } - if (sendProfileUpdateEventForSegmentUpdate) + if (sendProfileUpdateEventForSegmentUpdate) sendProfileUpdatedEvent(profiles.getList()); updatedProfileCount += profiles.size(); @@ -1118,7 +1112,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe private void batchUpdateProfilesSegment(String segmentId, List<Profile> profiles, boolean isAdd) { Map<Item, Map> profileToPropertiesMap = new HashMap<>(); for (Profile profileToUpdate : profiles) { - Map<String,Object> propertiesToUpdate = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd); + Map<String, Object> propertiesToUpdate = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd); profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate); } List<String> failedItemsIds = persistenceService.update(profileToPropertiesMap, null, Profile.class); @@ -1126,8 +1120,8 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId, isAdd)); } - private void retryFailedSegmentUpdate(String profileId, String segmentId, boolean isAdd){ - if (maxRetriesForUpdateProfileSegment > 0){ + private void retryFailedSegmentUpdate(String profileId, String segmentId, boolean isAdd) { + if (maxRetriesForUpdateProfileSegment > 0) { RetryPolicy retryPolicy = new RetryPolicy() .withDelay(Duration.ofSeconds(secondsDelayForRetryUpdateProfileSegment)) .withMaxRetries(maxRetriesForUpdateProfileSegment); @@ -1151,9 +1145,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } private void sendProfileUpdatedEvent(Profile profile) { - Event profileUpdated = new Event("profileUpdated", null, profile, null, null, profile, new Date()); - profileUpdated.setPersistent(false); - eventService.send(profileUpdated); + Event profileUpdated = new Event("profileUpdated", null, profile, null, null, profile, new Date()); + profileUpdated.setPersistent(false); + eventService.send(profileUpdated); } private Map<String, Object> buildPropertiesMapForUpdateSegment(Profile profile, String segmentId, boolean isAdd) { @@ -1257,7 +1251,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe long initialDelay = SchedulerServiceImpl.getTimeDiffInSeconds(dailyDateExprEvaluationHourUtc, ZonedDateTime.now(ZoneOffset.UTC)); logger.info("daily DateExpr segments will run at fixed rate, initialDelay={}, taskExecutionPeriod={}, ", initialDelay, TimeUnit.DAYS.toSeconds(1)); - schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS); + schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS); } private void loadScripts() throws IOException {
