This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch UNOMI-442-segment-past-event in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 92ae092d4793591a0d375537e0fb8f6b62151447 Author: Kevan <[email protected]> AuthorDate: Wed Mar 10 16:03:02 2021 +0100 UNOMI-442: fix creation of Segment that contains past event condition, correctly engage profiles during creation process --- .../java/org/apache/unomi/itests/SegmentIT.java | 61 ++++++++++++++++++++- .../services/impl/segments/SegmentServiceImpl.java | 63 ++++++++++++++-------- 2 files changed, 99 insertions(+), 25 deletions(-) diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java index fceb70c..2b5abf9 100644 --- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java @@ -17,11 +17,17 @@ package org.apache.unomi.itests; +import org.apache.unomi.api.Event; import org.apache.unomi.api.Metadata; +import org.apache.unomi.api.Profile; import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.segments.Segment; +import org.apache.unomi.api.services.EventService; +import org.apache.unomi.api.services.ProfileService; import org.apache.unomi.api.services.SegmentService; import org.apache.unomi.api.exceptions.BadSegmentConditionException; +import org.apache.unomi.persistence.spi.PersistenceService; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -34,6 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.Date; import java.util.List; @RunWith(PaxExam.class) @@ -42,15 +51,30 @@ public class SegmentIT extends BaseIT { private final static Logger LOGGER = LoggerFactory.getLogger(SegmentIT.class); private final static String SEGMENT_ID = "test-segment-id-2"; - @Inject - @Filter(timeout = 600000) + @Inject @Filter(timeout = 600000) protected SegmentService segmentService; + @Inject @Filter(timeout = 600000) + protected ProfileService profileService; + + @Inject @Filter(timeout = 600000) + protected EventService eventService; + + @Inject @Filter(timeout = 600000) + protected PersistenceService persistenceService; + @Before public void setUp() throws InterruptedException { removeItems(Segment.class); } + @After + public void tearDown() throws InterruptedException { + removeItems(Segment.class); + removeItems(Profile.class); + removeItems(Event.class); + } + @Test public void testSegments() { Assert.assertNotNull("Segment service should be available", segmentService); @@ -111,4 +135,37 @@ public class SegmentIT extends BaseIT { segmentService.removeSegmentDefinition(SEGMENT_ID, false); } + + @Test + public void testSegmentWithPastEventCondition() throws InterruptedException { + // create Profile + Profile profile = new Profile(); + profile.setItemId("test_profile_id"); + profileService.save(profile); + persistenceService.refreshIndex(Profile.class, null); // wait for profile to be full persisted and index + + // send event for profile from a previous date (today -3 days) + ZoneId defaultZoneId = ZoneId.systemDefault(); + LocalDate localDate = LocalDate.now().minusDays(3); + Event testEvent = new Event("test-event-type", null, profile, null, null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant())); + testEvent.setPersistent(true); + eventService.send(testEvent); + persistenceService.refreshIndex(Event.class, null); // wait for event to be fully persisted and indexed + + // create the segment + Metadata segmentMetadata = new Metadata("past-event-segment-test"); + Segment segment = new Segment(segmentMetadata); + Condition segmentCondition = new Condition(definitionsService.getConditionType("pastEventCondition")); + segmentCondition.setParameter("numberOfDays", 10); + Condition pastEventEventCondition = new Condition(definitionsService.getConditionType("eventTypeCondition")); + pastEventEventCondition.setParameter("eventTypeId", "test-event-type"); + segmentCondition.setParameter("eventCondition", pastEventEventCondition); + segment.setCondition(segmentCondition); + segmentService.setSegmentDefinition(segment); + + // insure the profile that did the past event condition is correctly engaged in the segment. + Thread.sleep(5000); + profile = profileService.load("test_profile_id"); + Assert.assertTrue("Profile should be engaged in the segment", profile.getSegments().contains("past-event-segment-test")); + } } diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index 7d8c2ce..2f159b0 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -776,7 +776,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe rule.setLinkedItems(Arrays.asList(metadata.getId())); rules.add(rule); - updateExistingProfilesForPastEventCondition(condition, parentCondition); + updateExistingProfilesForPastEventCondition(condition, parentCondition, true); } else { rule.getLinkedItems().add(metadata.getId()); rules.add(rule); @@ -798,7 +798,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } } - private void updateExistingProfilesForPastEventCondition(Condition eventCondition, Condition parentCondition) { + private void updateExistingProfilesForPastEventCondition(Condition eventCondition, Condition parentCondition, boolean forceRefresh) { long t = System.currentTimeMillis(); List<Condition> l = new ArrayList<Condition>(); Condition andCondition = new Condition(); @@ -839,20 +839,25 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey"); + int updatedProfileCount = 0; if(pastEventsDisablePartitions) { Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount); - updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey); + updatedProfileCount = updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey); } else { Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); long card = m.get("_card").longValue(); int numParts = (int) (card / aggregateQueryBucketSize) + 2; for (int i = 0; i < numParts; i++) { Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); - updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey); + updatedProfileCount += updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey); } } - logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis() - t); + if (forceRefresh && updatedProfileCount > 0) { + persistenceService.refreshIndex(Profile.class, null); + } + + logger.info("{} profiles updated for past event condition in {}ms", updatedProfileCount, System.currentTimeMillis() - t); } public String getGeneratedPropertyKey(Condition condition, Condition parentCondition) { @@ -878,25 +883,37 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } } - private void updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) { - for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { - String profileId = entry.getKey(); - if (!profileId.startsWith("_")) { - Map<String, Long> pastEventCounts = new HashMap<>(); - pastEventCounts.put(propertyKey, entry.getValue()); - Map<String, Object> systemProperties = new HashMap<>(); - systemProperties.put("pastEvents", pastEventCounts); - try { - systemProperties.put("lastUpdated", new Date()); - Profile profile = new Profile(); - profile.setItemId(profileId); - persistenceService.update(profile, null, Profile.class, "systemProperties", systemProperties); - } catch (Exception e) { - logger.error("Error updating profile {} past event system properties", profileId, e); - } - } + private int updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) { + int profileUpdatedCount = 0; + Map<Item, Map> batch = new HashMap<>(); + Iterator<Map.Entry<String, Long>> entryIterator = eventCountByProfile.entrySet().iterator(); + while (entryIterator.hasNext()){ + Map.Entry<String, Long> entry = entryIterator.next(); + String profileId = entry.getKey(); + if (!profileId.startsWith("_")) { + Map<String, Long> pastEventCounts = new HashMap<>(); + pastEventCounts.put(propertyKey, entry.getValue()); + Map<String, Object> systemProperties = new HashMap<>(); + systemProperties.put("pastEvents", pastEventCounts); + systemProperties.put("lastUpdated", new Date()); + + Profile profile = new Profile(); + profile.setItemId(profileId); + batch.put(profile, Collections.singletonMap("systemProperties", systemProperties)); } + if (batch.size() == segmentUpdateBatchSize || (!entryIterator.hasNext() && batch.size() > 0)) { + try { + persistenceService.update(batch, null, Profile.class); + profileUpdatedCount += batch.size(); + } catch (Exception e) { + logger.error("Error updating {} profiles for past event system properties", batch.size(), e); + } finally { + batch.clear(); + } + } + } + return profileUpdatedCount; } private String getMD5(String md5) { @@ -1115,7 +1132,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe if (action.getActionTypeId().equals("setEventOccurenceCountAction")) { Condition pastEventCondition = (Condition) action.getParameterValues().get("pastEventCondition"); if (pastEventCondition.containsParameter("numberOfDays")) { - updateExistingProfilesForPastEventCondition(rule.getCondition(), pastEventCondition); + updateExistingProfilesForPastEventCondition(rule.getCondition(), pastEventCondition, false); } } }
