This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new 9785d88 UNOMI-442: fix creation of Segment that contains past event
condition, correctly engage profiles during creation process (#260)
9785d88 is described below
commit 9785d884e7d6420c5efdaf90fe576f4710f98bde
Author: kevan Jahanshahi <[email protected]>
AuthorDate: Wed Mar 10 18:54:10 2021 +0100
UNOMI-442: fix creation of Segment that contains past event condition,
correctly engage profiles during creation process (#260)
---
.../java/org/apache/unomi/itests/SegmentIT.java | 61 ++++++++++++++++++++-
.../services/impl/segments/SegmentServiceImpl.java | 63 ++++++++++++++--------
2 files changed, 99 insertions(+), 25 deletions(-)
diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index fceb70c..2b5abf9 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -17,11 +17,17 @@
package org.apache.unomi.itests;
+import org.apache.unomi.api.Event;
import org.apache.unomi.api.Metadata;
+import org.apache.unomi.api.Profile;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.segments.Segment;
+import org.apache.unomi.api.services.EventService;
+import org.apache.unomi.api.services.ProfileService;
import org.apache.unomi.api.services.SegmentService;
import org.apache.unomi.api.exceptions.BadSegmentConditionException;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -34,6 +40,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Date;
import java.util.List;
@RunWith(PaxExam.class)
@@ -42,15 +51,30 @@ public class SegmentIT extends BaseIT {
private final static Logger LOGGER =
LoggerFactory.getLogger(SegmentIT.class);
private final static String SEGMENT_ID = "test-segment-id-2";
- @Inject
- @Filter(timeout = 600000)
+ @Inject @Filter(timeout = 600000)
protected SegmentService segmentService;
+ @Inject @Filter(timeout = 600000)
+ protected ProfileService profileService;
+
+ @Inject @Filter(timeout = 600000)
+ protected EventService eventService;
+
+ @Inject @Filter(timeout = 600000)
+ protected PersistenceService persistenceService;
+
@Before
public void setUp() throws InterruptedException {
removeItems(Segment.class);
}
+ @After
+ public void tearDown() throws InterruptedException {
+ removeItems(Segment.class);
+ removeItems(Profile.class);
+ removeItems(Event.class);
+ }
+
@Test
public void testSegments() {
Assert.assertNotNull("Segment service should be available",
segmentService);
@@ -111,4 +135,37 @@ public class SegmentIT extends BaseIT {
segmentService.removeSegmentDefinition(SEGMENT_ID, false);
}
+
+ @Test
+ public void testSegmentWithPastEventCondition() throws
InterruptedException {
+ // create Profile
+ Profile profile = new Profile();
+ profile.setItemId("test_profile_id");
+ profileService.save(profile);
+ persistenceService.refreshIndex(Profile.class, null); // wait for
profile to be full persisted and index
+
+ // send event for profile from a previous date (today -3 days)
+ ZoneId defaultZoneId = ZoneId.systemDefault();
+ LocalDate localDate = LocalDate.now().minusDays(3);
+ Event testEvent = new Event("test-event-type", null, profile, null,
null, profile, Date.from(localDate.atStartOfDay(defaultZoneId).toInstant()));
+ testEvent.setPersistent(true);
+ eventService.send(testEvent);
+ persistenceService.refreshIndex(Event.class, null); // wait for event
to be fully persisted and indexed
+
+ // create the segment
+ Metadata segmentMetadata = new Metadata("past-event-segment-test");
+ Segment segment = new Segment(segmentMetadata);
+ Condition segmentCondition = new
Condition(definitionsService.getConditionType("pastEventCondition"));
+ segmentCondition.setParameter("numberOfDays", 10);
+ Condition pastEventEventCondition = new
Condition(definitionsService.getConditionType("eventTypeCondition"));
+ pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+ segmentCondition.setParameter("eventCondition",
pastEventEventCondition);
+ segment.setCondition(segmentCondition);
+ segmentService.setSegmentDefinition(segment);
+
+ // insure the profile that did the past event condition is correctly
engaged in the segment.
+ Thread.sleep(5000);
+ profile = profileService.load("test_profile_id");
+ Assert.assertTrue("Profile should be engaged in the segment",
profile.getSegments().contains("past-event-segment-test"));
+ }
}
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 7d8c2ce..2f159b0 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -776,7 +776,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
rule.setLinkedItems(Arrays.asList(metadata.getId()));
rules.add(rule);
- updateExistingProfilesForPastEventCondition(condition,
parentCondition);
+ updateExistingProfilesForPastEventCondition(condition,
parentCondition, true);
} else {
rule.getLinkedItems().add(metadata.getId());
rules.add(rule);
@@ -798,7 +798,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
}
}
- private void updateExistingProfilesForPastEventCondition(Condition
eventCondition, Condition parentCondition) {
+ private void updateExistingProfilesForPastEventCondition(Condition
eventCondition, Condition parentCondition, boolean forceRefresh) {
long t = System.currentTimeMillis();
List<Condition> l = new ArrayList<Condition>();
Condition andCondition = new Condition();
@@ -839,20 +839,25 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
String propertyKey = (String)
parentCondition.getParameter("generatedPropertyKey");
+ int updatedProfileCount = 0;
if(pastEventsDisablePartitions) {
Map<String, Long> eventCountByProfile =
persistenceService.aggregateWithOptimizedQuery(eventCondition, new
TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
- updateProfilesWithPastEventProperty(eventCountByProfile,
propertyKey);
+ updatedProfileCount =
updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
} else {
Map<String, Double> m =
persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"},
"profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile =
persistenceService.aggregateWithOptimizedQuery(andCondition, new
TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
- updateProfilesWithPastEventProperty(eventCountByProfile,
propertyKey);
+ updatedProfileCount +=
updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey);
}
}
- logger.info("Profiles past condition updated in {}ms",
System.currentTimeMillis() - t);
+ if (forceRefresh && updatedProfileCount > 0) {
+ persistenceService.refreshIndex(Profile.class, null);
+ }
+
+ logger.info("{} profiles updated for past event condition in {}ms",
updatedProfileCount, System.currentTimeMillis() - t);
}
public String getGeneratedPropertyKey(Condition condition, Condition
parentCondition) {
@@ -878,25 +883,37 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
}
}
- private void updateProfilesWithPastEventProperty(Map<String, Long>
eventCountByProfile, String propertyKey) {
- for (Map.Entry<String, Long> entry :
eventCountByProfile.entrySet()) {
- String profileId = entry.getKey();
- if (!profileId.startsWith("_")) {
- Map<String, Long> pastEventCounts = new HashMap<>();
- pastEventCounts.put(propertyKey, entry.getValue());
- Map<String, Object> systemProperties = new HashMap<>();
- systemProperties.put("pastEvents", pastEventCounts);
- try {
- systemProperties.put("lastUpdated", new Date());
- Profile profile = new Profile();
- profile.setItemId(profileId);
- persistenceService.update(profile, null,
Profile.class, "systemProperties", systemProperties);
- } catch (Exception e) {
- logger.error("Error updating profile {} past event
system properties", profileId, e);
- }
- }
+ private int updateProfilesWithPastEventProperty(Map<String, Long>
eventCountByProfile, String propertyKey) {
+ int profileUpdatedCount = 0;
+ Map<Item, Map> batch = new HashMap<>();
+ Iterator<Map.Entry<String, Long>> entryIterator =
eventCountByProfile.entrySet().iterator();
+ while (entryIterator.hasNext()){
+ Map.Entry<String, Long> entry = entryIterator.next();
+ String profileId = entry.getKey();
+ if (!profileId.startsWith("_")) {
+ Map<String, Long> pastEventCounts = new HashMap<>();
+ pastEventCounts.put(propertyKey, entry.getValue());
+ Map<String, Object> systemProperties = new HashMap<>();
+ systemProperties.put("pastEvents", pastEventCounts);
+ systemProperties.put("lastUpdated", new Date());
+
+ Profile profile = new Profile();
+ profile.setItemId(profileId);
+ batch.put(profile,
Collections.singletonMap("systemProperties", systemProperties));
}
+ if (batch.size() == segmentUpdateBatchSize ||
(!entryIterator.hasNext() && batch.size() > 0)) {
+ try {
+ persistenceService.update(batch, null, Profile.class);
+ profileUpdatedCount += batch.size();
+ } catch (Exception e) {
+ logger.error("Error updating {} profiles for past event
system properties", batch.size(), e);
+ } finally {
+ batch.clear();
+ }
+ }
+ }
+ return profileUpdatedCount;
}
private String getMD5(String md5) {
@@ -1115,7 +1132,7 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
if
(action.getActionTypeId().equals("setEventOccurenceCountAction")) {
Condition pastEventCondition = (Condition)
action.getParameterValues().get("pastEventCondition");
if
(pastEventCondition.containsParameter("numberOfDays")) {
-
updateExistingProfilesForPastEventCondition(rule.getCondition(),
pastEventCondition);
+
updateExistingProfilesForPastEventCondition(rule.getCondition(),
pastEventCondition, false);
}
}
}