This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch UNOMI-442-segment-past-event
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit 92ae092d4793591a0d375537e0fb8f6b62151447
Author: Kevan <[email protected]>
AuthorDate: Wed Mar 10 16:03:02 2021 +0100

    UNOMI-442: fix creation of Segment that contains past event condition, 
correctly engage profiles during creation process
---
 .../java/org/apache/unomi/itests/SegmentIT.java    | 61 ++++++++++++++++++++-
 .../services/impl/segments/SegmentServiceImpl.java | 63 ++++++++++++++--------
 2 files changed, 99 insertions(+), 25 deletions(-)

diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java 
b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index fceb70c..2b5abf9 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -17,11 +17,17 @@
 
 package org.apache.unomi.itests;
 
+import org.apache.unomi.api.Event;
 import org.apache.unomi.api.Metadata;
+import org.apache.unomi.api.Profile;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.segments.Segment;
+import org.apache.unomi.api.services.EventService;
+import org.apache.unomi.api.services.ProfileService;
 import org.apache.unomi.api.services.SegmentService;
 import org.apache.unomi.api.exceptions.BadSegmentConditionException;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,6 +40,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Date;
 import java.util.List;
 
 @RunWith(PaxExam.class)
@@ -42,15 +51,30 @@ public class SegmentIT extends BaseIT {
     private final static Logger LOGGER = 
LoggerFactory.getLogger(SegmentIT.class);
     private final static String SEGMENT_ID = "test-segment-id-2";
 
-    @Inject
-    @Filter(timeout = 600000)
+    @Inject @Filter(timeout = 600000)
     protected SegmentService segmentService;
 
+    @Inject @Filter(timeout = 600000)
+    protected ProfileService profileService;
+
+    @Inject @Filter(timeout = 600000)
+    protected EventService eventService;
+
+    @Inject @Filter(timeout = 600000)
+    protected PersistenceService persistenceService;
+
     @Before
     public void setUp() throws InterruptedException {
         removeItems(Segment.class);
     }
 
+    @After
+    public void tearDown() throws InterruptedException {
+        removeItems(Segment.class);
+        removeItems(Profile.class);
+        removeItems(Event.class);
+    }
+
     @Test
     public void testSegments() {
         Assert.assertNotNull("Segment service should be available", 
segmentService);
@@ -111,4 +135,37 @@ public class SegmentIT extends BaseIT {
 
         segmentService.removeSegmentDefinition(SEGMENT_ID, false);
     }
+
+    @Test
+    public void testSegmentWithPastEventCondition() throws 
InterruptedException {
+        // create Profile
+        Profile profile = new Profile();
+        profile.setItemId("test_profile_id");
+        profileService.save(profile);
+        persistenceService.refreshIndex(Profile.class, null); // wait for 
profile to be full persisted and index
+
+        // send event for profile from a previous date (today -3 days)
+        ZoneId defaultZoneId = ZoneId.systemDefault();
+        LocalDate localDate = LocalDate.now().minusDays(3);
+        Event testEvent = new Event("test-event-type", null, profile, null, 
null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+        testEvent.setPersistent(true);
+        eventService.send(testEvent);
+        persistenceService.refreshIndex(Event.class, null); // wait for event 
to be fully persisted and indexed
+
+        // create the segment
+        Metadata segmentMetadata = new Metadata("past-event-segment-test");
+        Segment segment = new Segment(segmentMetadata);
+        Condition segmentCondition = new 
Condition(definitionsService.getConditionType("pastEventCondition"));
+        segmentCondition.setParameter("numberOfDays", 10);
+        Condition pastEventEventCondition = new 
Condition(definitionsService.getConditionType("eventTypeCondition"));
+        pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+        segmentCondition.setParameter("eventCondition", 
pastEventEventCondition);
+        segment.setCondition(segmentCondition);
+        segmentService.setSegmentDefinition(segment);
+
+        // insure the profile that did the past event condition is correctly 
engaged in the segment.
+        Thread.sleep(5000);
+        profile = profileService.load("test_profile_id");
+        Assert.assertTrue("Profile should be engaged in the segment", 
profile.getSegments().contains("past-event-segment-test"));
+    }
 }
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 7d8c2ce..2f159b0 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -776,7 +776,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl 
implements SegmentSe
                     rule.setLinkedItems(Arrays.asList(metadata.getId()));
                     rules.add(rule);
 
-                    updateExistingProfilesForPastEventCondition(condition, 
parentCondition);
+                    updateExistingProfilesForPastEventCondition(condition, 
parentCondition, true);
                 } else {
                     rule.getLinkedItems().add(metadata.getId());
                     rules.add(rule);
@@ -798,7 +798,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl 
implements SegmentSe
         }
     }
 
-    private void updateExistingProfilesForPastEventCondition(Condition 
eventCondition, Condition parentCondition) {
+    private void updateExistingProfilesForPastEventCondition(Condition 
eventCondition, Condition parentCondition, boolean forceRefresh) {
         long t = System.currentTimeMillis();
         List<Condition> l = new ArrayList<Condition>();
         Condition andCondition = new Condition();
@@ -839,20 +839,25 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
 
         String propertyKey = (String) 
parentCondition.getParameter("generatedPropertyKey");
 
+        int updatedProfileCount = 0;
         if(pastEventsDisablePartitions) {
             Map<String, Long> eventCountByProfile = 
persistenceService.aggregateWithOptimizedQuery(eventCondition, new 
TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
-            updateProfilesWithPastEventProperty(eventCountByProfile, 
propertyKey);
+            updatedProfileCount = 
updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
         } else {
             Map<String, Double> m = 
persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, 
"profileId.keyword", Event.ITEM_TYPE);
             long card = m.get("_card").longValue();
             int numParts = (int) (card / aggregateQueryBucketSize) + 2;
             for (int i = 0; i < numParts; i++) {
                 Map<String, Long> eventCountByProfile = 
persistenceService.aggregateWithOptimizedQuery(andCondition, new 
TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
-                updateProfilesWithPastEventProperty(eventCountByProfile, 
propertyKey);
+                updatedProfileCount += 
updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
             }
         }
 
-        logger.info("Profiles past condition updated in {}ms", 
System.currentTimeMillis() - t);
+        if (forceRefresh && updatedProfileCount > 0) {
+            persistenceService.refreshIndex(Profile.class, null);
+        }
+
+        logger.info("{} profiles updated for past event condition in {}ms", 
updatedProfileCount, System.currentTimeMillis() - t);
     }
 
     public String getGeneratedPropertyKey(Condition condition, Condition 
parentCondition) {
@@ -878,25 +883,37 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
         }
     }
 
-    private void updateProfilesWithPastEventProperty(Map<String, Long> 
eventCountByProfile, String propertyKey) {
-            for (Map.Entry<String, Long> entry : 
eventCountByProfile.entrySet()) {
-                String profileId = entry.getKey();
-                if (!profileId.startsWith("_")) {
-                    Map<String, Long> pastEventCounts = new HashMap<>();
-                    pastEventCounts.put(propertyKey, entry.getValue());
-                    Map<String, Object> systemProperties = new HashMap<>();
-                    systemProperties.put("pastEvents", pastEventCounts);
-                    try {
-                        systemProperties.put("lastUpdated", new Date());
-                        Profile profile = new Profile();
-                        profile.setItemId(profileId);
-                        persistenceService.update(profile, null, 
Profile.class, "systemProperties", systemProperties);
-                    } catch (Exception e) {
-                        logger.error("Error updating profile {} past event 
system properties", profileId, e);
-                    }
-                }
+    private int updateProfilesWithPastEventProperty(Map<String, Long> 
eventCountByProfile, String propertyKey) {
+        int profileUpdatedCount = 0;
+        Map<Item, Map> batch = new HashMap<>();
+        Iterator<Map.Entry<String, Long>> entryIterator = 
eventCountByProfile.entrySet().iterator();
+        while (entryIterator.hasNext()){
+            Map.Entry<String, Long> entry = entryIterator.next();
+            String profileId = entry.getKey();
+            if (!profileId.startsWith("_")) {
+                Map<String, Long> pastEventCounts = new HashMap<>();
+                pastEventCounts.put(propertyKey, entry.getValue());
+                Map<String, Object> systemProperties = new HashMap<>();
+                systemProperties.put("pastEvents", pastEventCounts);
+                systemProperties.put("lastUpdated", new Date());
+
+                Profile profile = new Profile();
+                profile.setItemId(profileId);
+                batch.put(profile, 
Collections.singletonMap("systemProperties", systemProperties));
             }
 
+            if (batch.size() == segmentUpdateBatchSize || 
(!entryIterator.hasNext() && batch.size() > 0)) {
+                try {
+                    persistenceService.update(batch, null, Profile.class);
+                    profileUpdatedCount += batch.size();
+                } catch (Exception e) {
+                    logger.error("Error updating {} profiles for past event 
system properties", batch.size(), e);
+                } finally {
+                    batch.clear();
+                }
+            }
+        }
+        return profileUpdatedCount;
     }
 
     private String getMD5(String md5) {
@@ -1115,7 +1132,7 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
                             if 
(action.getActionTypeId().equals("setEventOccurenceCountAction")) {
                                 Condition pastEventCondition = (Condition) 
action.getParameterValues().get("pastEventCondition");
                                 if 
(pastEventCondition.containsParameter("numberOfDays")) {
-                                    
updateExistingProfilesForPastEventCondition(rule.getCondition(), 
pastEventCondition);
+                                    
updateExistingProfilesForPastEventCondition(rule.getCondition(), 
pastEventCondition, false);
                                 }
                             }
                         }

Reply via email to