This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch fixDateRelativeSegmentScoringRecalculateJob in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 5afda6d437cc9a1e2f430442217b89e2c4766129 Author: Kevan <[email protected]> AuthorDate: Wed Dec 29 10:39:31 2021 +0100 UNOMI-521: daily job to recalculate segments and scorings that contains date expression conditions --- .../apache/unomi/api/services/SegmentService.java | 4 +- .../java/org/apache/unomi/itests/SegmentIT.java | 83 ++++++++++++++++++++-- .../services/impl/segments/SegmentServiceImpl.java | 64 ++++++++--------- 3 files changed, 113 insertions(+), 38 deletions(-) diff --git a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java index a54489b..b9fdff6 100644 --- a/api/src/main/java/org/apache/unomi/api/services/SegmentService.java +++ b/api/src/main/java/org/apache/unomi/api/services/SegmentService.java @@ -229,11 +229,13 @@ public interface SegmentService { /** * This will recalculate the past event conditions from existing rules + * It will also recalculate date relative Segments and Scorings (when they contains date expression conditions for example) * This operation can be heavy and take time, it will: * - browse existing rules to extract the past event condition, * - query the matching events for those conditions, * - update the corresponding profiles - * - reevaluate segments linked to this rules to engaged/disengaged profiles after the occurrences have been updated + * - reevaluate segments/scorings linked to this rules to engaged/disengaged profiles after the occurrences have been updated + * - reevaluate segments/scoring that contains date expressions * So use it carefully or execute this method in a dedicated thread. */ void recalculatePastEventConditions(); diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java index 047fdd0..55f3ce9 100644 --- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java @@ -46,10 +46,7 @@ import javax.inject.Inject; import java.text.SimpleDateFormat; import java.time.LocalDate; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; @RunWith(PaxExam.class) @ExamReactorStrategy(PerSuite.class) @@ -508,4 +505,82 @@ public class SegmentIT extends BaseIT { boolean isRule = rules.stream().anyMatch(rule -> rule.getItemId().equals(pastEventCondition.getParameter("generatedPropertyKey"))); Assert.assertFalse("Rule is properly removed", isRule); } + + @Test + public void testSegmentWithRelativeDateExpressions() throws Exception { + // create Profile + Profile profile = new Profile(); + profile.setItemId("test_profile_id"); + profileService.save(profile); + persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index + + // create the conditions + Condition booleanCondition = new Condition(definitionsService.getConditionType("booleanCondition")); + List<Condition> subConditions = new ArrayList<>(); + Condition dateExpCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + dateExpCondition.setParameter("propertyName", "properties.lastVisit"); + dateExpCondition.setParameter("comparisonOperator", "greaterThanOrEqualTo"); + dateExpCondition.setParameter("propertyValueDateExpr", "now-5d"); + subConditions.add(dateExpCondition); + Condition otherCondition = new Condition(definitionsService.getConditionType("profilePropertyCondition")); + otherCondition.setParameter("propertyName", "properties.address"); + otherCondition.setParameter("comparisonOperator", "notEquals"); + otherCondition.setParameter("propertyValueDateExpr", "test"); + subConditions.add(otherCondition); + booleanCondition.setParameter("operator", "and"); + booleanCondition.setParameter("subConditions", subConditions); + + // create segment and scoring + Metadata segmentMetadata = new Metadata("relative-date-segment-test"); + Segment segment = new Segment(segmentMetadata); + segment.setCondition(booleanCondition); + segmentService.setSegmentDefinition(segment); + Metadata scoringMetadata = new Metadata("relative-date-scoring-test"); + Scoring scoring = new Scoring(scoringMetadata); + ScoringElement scoringElement = new ScoringElement(); + scoringElement.setCondition(booleanCondition); + scoringElement.setValue(5); + scoring.setElements(Collections.singletonList(scoringElement)); + segmentService.setScoringDefinition(scoring); + Thread.sleep(5000); + + // insure the profile is not yet engaged since we directly saved the profile in ES + profile = profileService.load("test_profile_id"); + Assert.assertFalse("Profile should not be engaged in the segment", profile.getSegments().contains("relative-date-segment-test")); + Assert.assertTrue("Profile should not be engaged in the scoring", profile.getScores() == null || !profile.getScores().containsKey("relative-date-scoring-test")); + + // Update the profile last visit to match the segment ans the scoring + ZoneId defaultZoneId = ZoneId.systemDefault(); + LocalDate localDate = LocalDate.now().minusDays(3); + profile.setProperty("lastVisit", Date.from(localDate.atStartOfDay(defaultZoneId).toInstant())); + profileService.save(profile); + persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index + + // insure the profile is not yet engaged since we directly saved the profile in ES + profile = profileService.load("test_profile_id"); + Assert.assertFalse("Profile should not be engaged in the segment", profile.getSegments().contains("relative-date-segment-test")); + Assert.assertTrue("Profile should not be engaged in the scoring", profile.getScores() == null || profile.getScores().containsKey("relative-date-scoring-test")); + + // now force the recalculation of the date relative segments/scorings + segmentService.recalculatePastEventConditions(); + persistenceService.refreshIndex(Profile.class, null); + keepTrying("Profile should be engaged in the segment and scoring", + () -> profileService.load("test_profile_id"), + updatedProfile -> updatedProfile.getSegments().contains("relative-date-segment-test") && updatedProfile.getScores() != null && updatedProfile.getScores().get("relative-date-scoring-test") == 5, + 1000, 20); + + // update the profile to a date out of date expression + localDate = LocalDate.now().minusDays(15); + profile.setProperty("lastVisit", Date.from(localDate.atStartOfDay(defaultZoneId).toInstant())); + profileService.save(profile); + persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index + + // now force the recalculation of the date relative segments/scorings + segmentService.recalculatePastEventConditions(); + persistenceService.refreshIndex(Profile.class, null); + keepTrying("Profile should not be engaged in the segment and scoring anymore", + () -> profileService.load("test_profile_id"), + updatedProfile -> !updatedProfile.getSegments().contains("relative-date-segment-test") && (updatedProfile.getScores() == null || !updatedProfile.getScores().containsKey("relative-date-scoring-test")), + 1000, 20); + } } \ No newline at end of file diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index 1742328..dc1333b 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -947,11 +947,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe @Override public void recalculatePastEventConditions() { - logger.info("running scheduled task to recalculate segments with pastEventCondition conditions"); - long pastEventsTaskStartTime = System.currentTimeMillis(); - Set<String> linkedItems = new HashSet<>(); + Set<String> segmentOrScoringIdsToReevaluate = new HashSet<>(); + // reevaluate auto generated rules used to store the event occurrence count on the profile for (Metadata metadata : rulesService.getRuleMetadatas()) { - // reevaluate auto generated rules used to store the event occurrence count on the profile Rule rule = rulesService.getRule(metadata.getId()); for (Action action : rule.getActions()) { if (action.getActionTypeId().equals("setEventOccurenceCountAction")) { @@ -960,31 +958,46 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe recalculatePastEventOccurrencesOnProfiles(rule.getCondition(), pastEventCondition, true, true); logger.info("Event occurrence count on profiles updated for rule: {}", rule.getItemId()); if (rule.getLinkedItems() != null && rule.getLinkedItems().size() > 0) { - linkedItems.addAll(rule.getLinkedItems()); + segmentOrScoringIdsToReevaluate.addAll(rule.getLinkedItems()); } } } } } - - // reevaluate segments and scoring linked to this rule, since we have updated the event occurrences count on the profiles. - if (linkedItems.size() > 0) { + int pastEventSegmentsAndScoringsSize = segmentOrScoringIdsToReevaluate.size(); + logger.info("Found {} segments or scoring plans containing pastEventCondition conditions", pastEventSegmentsAndScoringsSize); + + // get Segments and Scoring that contains relative date expressions + segmentOrScoringIdsToReevaluate.addAll(allSegments.stream() + .filter(segment -> segment.getCondition() != null && segment.getCondition().toString().contains("propertyValueDateExpr")) + .map(Item::getItemId) + .collect(Collectors.toList())); + + segmentOrScoringIdsToReevaluate.addAll(allScoring.stream() + .filter(scoring -> scoring.getElements() != null && scoring.getElements().size() > 0 && scoring.getElements().stream() + .anyMatch(scoringElement -> scoringElement != null && scoringElement.getCondition() != null && scoringElement.getCondition().toString().contains("propertyValueDateExpr"))) + .map(Item::getItemId) + .collect(Collectors.toList())); + logger.info("Found {} segments or scoring plans containing date relative expressions", segmentOrScoringIdsToReevaluate.size() - pastEventSegmentsAndScoringsSize); + + // reevaluate segments and scoring. + if (segmentOrScoringIdsToReevaluate.size() > 0) { persistenceService.refreshIndex(Profile.class, null); - for (String linkedItem : linkedItems) { + for (String linkedItem : segmentOrScoringIdsToReevaluate) { Segment linkedSegment = getSegmentDefinition(linkedItem); if (linkedSegment != null) { + logger.info("Start segment recalculation for segment: {} - {}", linkedSegment.getItemId(), linkedSegment.getMetadata().getName()); updateExistingProfilesForSegment(linkedSegment); continue; } Scoring linkedScoring = getScoringDefinition(linkedItem); if (linkedScoring != null) { + logger.info("Start scoring plan recalculation for scoring plan: {} - {}", linkedScoring.getItemId(), linkedScoring.getMetadata().getName()); updateExistingProfilesForScoring(linkedScoring.getItemId(), linkedScoring.getElements(), linkedScoring.getMetadata().isEnabled()); } } } - - logger.info("finished recalculate segments with pastEventCondition conditions in {}ms. ", System.currentTimeMillis() - pastEventsTaskStartTime); } /** @@ -1212,13 +1225,18 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe @Override public void run() { try { + long currentTimeMillis = System.currentTimeMillis(); + logger.info("running scheduled task to recalculate segments and scoring that contains date relative conditions"); recalculatePastEventConditions(); + logger.info("finished recalculate segments and scoring that contains date relative conditions in {}ms. ", System.currentTimeMillis() - currentTimeMillis); } catch (Throwable t) { - logger.error("Error while updating profiles for past event conditions", t); + logger.error("Error while updating profiles for segments and scoring that contains date relative conditions", t); } } }; - schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 1, taskExecutionPeriod, TimeUnit.DAYS); + long initialDelay = SchedulerServiceImpl.getTimeDiffInSeconds(dailyDateExprEvaluationHourUtc, ZonedDateTime.now(ZoneOffset.UTC)); + logger.info("daily recalculation job for segments and scoring that contains date relative conditions will run at fixed rate, initialDelay={}, taskExecutionPeriod={}", initialDelay, TimeUnit.DAYS.toSeconds(1)); + schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, initialDelay, taskExecutionPeriod, TimeUnit.DAYS); task = new TimerTask() { @Override @@ -1232,26 +1250,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } }; schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0, segmentRefreshInterval, TimeUnit.MILLISECONDS); - - task = new TimerTask() { - @Override - public void run() { - try { - long dateExprTaskStartTime = System.currentTimeMillis(); - List<Segment> dateExprSegments = allSegments.stream().filter(segment -> - segment.getCondition().toString().contains("propertyValueDateExpr")).collect(Collectors.toList()); - logger.info("running scheduled task to recalculate segments with DateExpr condition, found {} segments", dateExprSegments.size()); - dateExprSegments.forEach(segment -> updateExistingProfilesForSegment(segment)); - logger.info("finished recalculate segments with DateExpr conditions in {}ms. ", System.currentTimeMillis() - dateExprTaskStartTime); - } catch (Throwable t) { - logger.error("Error while updating profiles for DateExpr conditions", t); - } - } - }; - - long initialDelay = SchedulerServiceImpl.getTimeDiffInSeconds(dailyDateExprEvaluationHourUtc, ZonedDateTime.now(ZoneOffset.UTC)); - logger.info("daily DateExpr segments will run at fixed rate, initialDelay={}, taskExecutionPeriod={}, ", initialDelay, TimeUnit.DAYS.toSeconds(1)); - schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, initialDelay, TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS); } private void loadScripts() throws IOException {
