This is an automated email from the ASF dual-hosted git repository.
dgriffon pushed a commit to branch unomi-1.6.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.6.x by this push:
new 83a50f9 UNOMI-527 : fix duplicated linkedItems (#364)
83a50f9 is described below
commit 83a50f951243cf0a012f38a57f5f202f70b295e6
Author: David Griffon <[email protected]>
AuthorDate: Thu Nov 18 11:37:50 2021 +0100
UNOMI-527 : fix duplicated linkedItems (#364)
* UNOMI-527 : fix duplicated linkedItems
* UNOMI-527 : code review improvements
---
.../main/java/org/apache/unomi/api/rules/Rule.java | 3 +-
.../java/org/apache/unomi/itests/SegmentIT.java | 49 ++++++++++
.../services/impl/segments/SegmentServiceImpl.java | 104 ++++++++++-----------
3 files changed, 100 insertions(+), 56 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/rules/Rule.java
b/api/src/main/java/org/apache/unomi/api/rules/Rule.java
index d3a44cb..0aedfb1 100644
--- a/api/src/main/java/org/apache/unomi/api/rules/Rule.java
+++ b/api/src/main/java/org/apache/unomi/api/rules/Rule.java
@@ -22,6 +22,7 @@ import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import java.util.List;
+import java.util.stream.Collectors;
/**
* A conditional set of actions to be executed in response to incoming events.
Triggering of rules is guarded by a condition: the rule is only triggered if
the associated
@@ -122,7 +123,7 @@ public class Rule extends MetadataItem {
* @param linkedItems the linked items
*/
public void setLinkedItems(List<String> linkedItems) {
- this.linkedItems = linkedItems;
+ this.linkedItems = linkedItems == null ? null :
linkedItems.stream().distinct().collect(Collectors.toList());
}
/**
diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index bbb0354..9a776f3 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -21,6 +21,7 @@ import org.apache.unomi.api.Event;
import org.apache.unomi.api.Metadata;
import org.apache.unomi.api.Profile;
import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.rules.Rule;
import org.apache.unomi.api.segments.Scoring;
import org.apache.unomi.api.segments.ScoringElement;
import org.apache.unomi.api.segments.Segment;
@@ -358,4 +359,52 @@ public class SegmentIT extends BaseIT {
updatedProfile ->
!updatedProfile.getScores().containsKey("past-event-scoring-test"),
1000, 20);
}
+
+ @Test
+ public void testLinkedItems() throws Exception {
+
+ // create the past event condition
+ Condition pastEventCondition = new
Condition(definitionsService.getConditionType("pastEventCondition"));
+ pastEventCondition.setParameter("minimumEventCount", 1);
+ pastEventCondition.setParameter("maximumEventCount", 2);
+
+ pastEventCondition.setParameter("fromDate", "2000-07-15T07:00:00Z");
+ pastEventCondition.setParameter("toDate", "2001-01-15T07:00:00Z");
+ ;
+ Condition pastEventEventCondition = new
Condition(definitionsService.getConditionType("eventTypeCondition"));
+ pastEventEventCondition.setParameter("eventTypeId", "test-event-type");
+ pastEventCondition.setParameter("eventCondition",
pastEventEventCondition);
+
+ // create the scoring
+ Metadata scoringMetadata = new Metadata("past-event-scoring-test");
+ Scoring scoring = new Scoring(scoringMetadata);
+ List<ScoringElement> scoringElements = new ArrayList<>();
+ ScoringElement scoringElement = new ScoringElement();
+ scoringElement.setCondition(pastEventCondition);
+ scoringElement.setValue(50);
+ scoringElements.add(scoringElement);
+ scoring.setElements(scoringElements);
+ segmentService.setScoringDefinition(scoring);
+ refreshPersistence();
+ // Check linkedItems
+ List<Rule> rules = persistenceService.getAllItems(Rule.class);
+ Rule scoringRule = rules.stream().filter(rule ->
rule.getItemId().equals(pastEventCondition.getParameter("generatedPropertyKey"))).findFirst().get();
+ Assert.assertEquals("Scoring linked Item should be one", 1,
scoringRule.getLinkedItems().size());
+
+ // save the scoring once again
+ segmentService.setScoringDefinition(scoring);
+ refreshPersistence();
+ // Check linkedItems
+ rules = persistenceService.getAllItems(Rule.class);
+ scoringRule = rules.stream().filter(rule ->
rule.getItemId().equals(pastEventCondition.getParameter("generatedPropertyKey"))).findFirst().get();
+ Assert.assertEquals("Scoring linked Item should be one", 1,
scoringRule.getLinkedItems().size());
+
+ // Remove scoring
+ segmentService.removeSegmentDefinition(scoring.getItemId(), true);
+ refreshPersistence();
+ // Check linkedItems
+ rules = persistenceService.getAllItems(Rule.class);
+ boolean isRule = rules.stream().anyMatch(rule ->
rule.getItemId().equals(pastEventCondition.getParameter("generatedPropertyKey")));
+ Assert.assertFalse("Rule is properly removed", isRule);
+ }
}
\ No newline at end of file
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 6e3592a..2f4e0f2 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -143,7 +143,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
this.batchSegmentProfileUpdate = batchSegmentProfileUpdate;
}
- public void setSendProfileUpdateEventForSegmentUpdate(boolean
sendProfileUpdateEventForSegmentUpdate){
+ public void setSendProfileUpdateEventForSegmentUpdate(boolean
sendProfileUpdateEventForSegmentUpdate) {
this.sendProfileUpdateEventForSegmentUpdate =
sendProfileUpdateEventForSegmentUpdate;
}
@@ -295,8 +295,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
private boolean checkSegmentDeletionImpact(Condition condition, String
segmentToDeleteId) {
if (condition != null) {
- @SuppressWarnings("unchecked")
- final List<Condition> subConditions = (List<Condition>)
condition.getParameter("subConditions");
+ @SuppressWarnings("unchecked") final List<Condition> subConditions
= (List<Condition>) condition.getParameter("subConditions");
if (subConditions != null) {
for (Condition subCondition : subConditions) {
if (checkSegmentDeletionImpact(subCondition,
segmentToDeleteId)) {
@@ -304,8 +303,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
}
}
} else if
("profileSegmentCondition".equals(condition.getConditionTypeId())) {
- @SuppressWarnings("unchecked")
- final List<String> referencedSegmentIds = (List<String>)
condition.getParameter("segments");
+ @SuppressWarnings("unchecked") final List<String>
referencedSegmentIds = (List<String>) condition.getParameter("segments");
if (referencedSegmentIds.indexOf(segmentToDeleteId) >= 0) {
return true;
@@ -326,8 +324,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
*/
private Condition updateSegmentDependentCondition(Condition condition,
String segmentId) {
if ("booleanCondition".equals(condition.getConditionTypeId())) {
- @SuppressWarnings("unchecked")
- final List<Condition> subConditions = (List<Condition>)
condition.getParameter("subConditions");
+ @SuppressWarnings("unchecked") final List<Condition> subConditions
= (List<Condition>) condition.getParameter("subConditions");
List<Condition> updatedSubConditions = new LinkedList<>();
for (Condition subCondition : subConditions) {
Condition updatedCondition =
updateSegmentDependentCondition(subCondition, segmentId);
@@ -346,8 +343,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
return null;
}
} else if
("profileSegmentCondition".equals(condition.getConditionTypeId())) {
- @SuppressWarnings("unchecked")
- final List<String> referencedSegmentIds = (List<String>)
condition.getParameter("segments");
+ @SuppressWarnings("unchecked") final List<String>
referencedSegmentIds = (List<String>) condition.getParameter("segments");
if (referencedSegmentIds.indexOf(segmentId) >= 0) {
referencedSegmentIds.remove(segmentId);
if (referencedSegmentIds.isEmpty()) {
@@ -411,8 +407,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
long profileRemovalStartTime = System.currentTimeMillis();
if (batchSegmentProfileUpdate && previousProfiles.size() > 0) {
batchUpdateProfilesSegment(segmentId, previousProfiles, false);
- }
- else {
+ } else {
for (Profile profileToRemove : previousProfiles) {
Map<String, Object> sourceMap =
buildPropertiesMapForUpdateSegment(profileToRemove, segmentId, false);
persistenceService.update(profileToRemove, null,
Profile.class, sourceMap);
@@ -587,17 +582,17 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
persistenceService.save(scoring);
persistenceService.createMapping(Profile.ITEM_TYPE, String.format(
- "{\n" +
- " \"properties\": {\n" +
- " \"scores\": {\n" +
- " \"properties\": {\n" +
- " \"%s\": {\n" +
- " \"type\":\"long\"\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- "}", scoring.getItemId()));
+ "{\n" +
+ " \"properties\": {\n" +
+ " \"scores\": {\n" +
+ " \"properties\": {\n" +
+ " \"%s\": {\n" +
+ " \"type\":\"long\"\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}", scoring.getItemId()));
updateExistingProfilesForScoring(scoring.getItemId(),
scoring.getElements(), scoring.getMetadata().isEnabled());
}
@@ -616,8 +611,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
private boolean checkScoringDeletionImpact(Condition condition, String
scoringToDeleteId) {
if (condition != null) {
- @SuppressWarnings("unchecked")
- final List<Condition> subConditions = (List<Condition>)
condition.getParameter("subConditions");
+ @SuppressWarnings("unchecked") final List<Condition> subConditions
= (List<Condition>) condition.getParameter("subConditions");
if (subConditions != null) {
for (Condition subCondition : subConditions) {
if (checkScoringDeletionImpact(subCondition,
scoringToDeleteId)) {
@@ -644,8 +638,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
*/
private Condition updateScoringDependentCondition(Condition condition,
String scoringId) {
if ("booleanCondition".equals(condition.getConditionTypeId())) {
- @SuppressWarnings("unchecked")
- final List<Condition> subConditions = (List<Condition>)
condition.getParameter("subConditions");
+ @SuppressWarnings("unchecked") final List<Condition> subConditions
= (List<Condition>) condition.getParameter("subConditions");
List<Condition> updatedSubConditions = new LinkedList<>();
for (Condition subCondition : subConditions) {
Condition updatedCondition =
updateScoringDependentCondition(subCondition, scoringId);
@@ -772,7 +765,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
private void clearAutoGeneratedRules(List<Rule> rules, String idWithScope)
{
for (Rule previousRule : rules) {
- previousRule.getLinkedItems().remove(idWithScope);
+
previousRule.getLinkedItems().removeAll(Collections.singleton(idWithScope));
if (previousRule.getLinkedItems().isEmpty()) {
// todo remove profile properties ?
persistenceService.remove(previousRule.getItemId(),
Rule.class);
@@ -799,15 +792,14 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
rule.setActions(Arrays.asList(action));
rule.setLinkedItems(Arrays.asList(metadata.getId()));
- rules.add(rule);
// it's a new generated rules to keep track of the event
count, we should update all the profile that match this past event
// it will update the count of event occurrence on the
profile directly
recalculatePastEventOccurrencesOnProfiles(condition,
parentCondition, true, false);
- } else {
+ } else if (!rule.getLinkedItems().contains(metadata.getId())) {
rule.getLinkedItems().add(metadata.getId());
- rules.add(rule);
}
+ rules.add(rule);
}
} else {
Collection<Object> values = new
ArrayList<>(condition.getParameterValues().values());
@@ -827,16 +819,17 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
/**
* This will recalculate the event counts on the profiles that match the
given past event condition
- * @param eventCondition the real condition
- * @param parentCondition the past event condition
- * @param forceRefresh will refresh the Profile index in case it's true
+ *
+ * @param eventCondition the real condition
+ * @param parentCondition the past event condition
+ * @param forceRefresh will refresh the Profile index
in case it's true
* @param resetExistingProfilesNotMatching if true, will reset existing
profiles having a count to 0, in case they do not have events matching anymore
* ("false" can be useful when you
know that no existing profiles already exist because it's a new rule for
example,
* in that case setting this to
"false" allow to skip profiles queries and speedup this process.
* Otherwise use "true" here to be
sure the count is reset to 0 on profiles that need to be reset)
*/
private void recalculatePastEventOccurrencesOnProfiles(Condition
eventCondition, Condition parentCondition,
- boolean
forceRefresh, boolean resetExistingProfilesNotMatching) {
+ boolean
forceRefresh, boolean resetExistingProfilesNotMatching) {
long t = System.currentTimeMillis();
List<Condition> l = new ArrayList<Condition>();
Condition andCondition = new Condition();
@@ -858,7 +851,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
numberOfDaysCondition.setParameter("propertyValue", "now-" +
numberOfDays + "d");
l.add(numberOfDaysCondition);
}
- if (fromDate != null) {
+ if (fromDate != null) {
Condition startDateCondition = new Condition();
startDateCondition.setConditionType(definitionsService.getConditionType("sessionPropertyCondition"));
startDateCondition.setParameter("propertyName", "timeStamp");
@@ -866,7 +859,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
startDateCondition.setParameter("propertyValueDate", fromDate);
l.add(startDateCondition);
}
- if (toDate != null) {
+ if (toDate != null) {
Condition endDateCondition = new Condition();
endDateCondition.setConditionType(definitionsService.getConditionType("sessionPropertyCondition"));
endDateCondition.setParameter("propertyName", "timeStamp");
@@ -879,9 +872,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
Set<String> existingProfilesWithCounts =
resetExistingProfilesNotMatching ?
getExistingProfilesWithPastEventOccurrenceCount(propertyKey) :
Collections.emptySet();
int updatedProfileCount = 0;
- if(pastEventsDisablePartitions) {
+ if (pastEventsDisablePartitions) {
Map<String, Long> eventCountByProfile =
persistenceService.aggregateWithOptimizedQuery(eventCondition, new
TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount);
- Set<String> updatedProfiles =
updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
+ Set<String> updatedProfiles =
updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
existingProfilesWithCounts.removeAll(updatedProfiles);
updatedProfileCount = updatedProfiles.size();
} else {
@@ -890,7 +883,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile =
persistenceService.aggregateWithOptimizedQuery(andCondition, new
TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
- Set<String> updatedProfiles =
updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
+ Set<String> updatedProfiles =
updatePastEventOccurrencesOnProfiles(eventCountByProfile, propertyKey);
existingProfilesWithCounts.removeAll(updatedProfiles);
updatedProfileCount += updatedProfiles.size();
}
@@ -912,6 +905,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
/**
* Return the list of profile ids, for profiles that already have an event
count matching the generated property key
+ *
* @param generatedPropertyKey the generated property key of the generated
rule for the given past event condition.
* @return the list of profile ids.
*/
@@ -923,7 +917,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
countExistsCondition.setParameter("propertyValueInteger", 0);
Set<String> profileIds = new HashSet<>();
- if(pastEventsDisablePartitions) {
+ if (pastEventsDisablePartitions) {
profileIds.addAll(persistenceService.aggregateWithOptimizedQuery(countExistsCondition,
new TermsAggregate("itemId"),
Profile.ITEM_TYPE, maximumIdsQueryCount).keySet());
} else {
@@ -956,7 +950,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl
implements SegmentSe
String key =
CustomObjectMapper.getObjectMapper().writeValueAsString(m);
return "eventTriggered" + getMD5(key);
} catch (JsonProcessingException e) {
- logger.error("Cannot generate key",e);
+ logger.error("Cannot generate key", e);
return null;
}
}
@@ -1005,15 +999,16 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
/**
* This will update all the profiles in the given map with the according
new count occurrence for the given propertyKey
+ *
* @param eventCountByProfile the events count per profileId map
- * @param propertyKey the generate property key for this past event
condition, to keep track of the count in the profile
+ * @param propertyKey the generate property key for this past
event condition, to keep track of the count in the profile
* @return the list of profiles for witch the count of event occurrences
have been updated.
*/
private Set<String> updatePastEventOccurrencesOnProfiles(Map<String, Long>
eventCountByProfile, String propertyKey) {
Set<String> profilesUpdated = new HashSet<>();
Map<Item, Map> batch = new HashMap<>();
Iterator<Map.Entry<String, Long>> entryIterator =
eventCountByProfile.entrySet().iterator();
- while (entryIterator.hasNext()){
+ while (entryIterator.hasNext()) {
Map.Entry<String, Long> entry = entryIterator.next();
String profileId = entry.getKey();
if (!profileId.startsWith("_")) {
@@ -1098,22 +1093,21 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
logger.info("{} profiles updated in {}ms", updatedProfileCount,
System.currentTimeMillis() - updateProfilesForSegmentStartTime);
}
- private long updateProfilesSegment(Condition profilesToUpdateCondition,
String segmentId, boolean isAdd){
- long updatedProfileCount= 0;
+ private long updateProfilesSegment(Condition profilesToUpdateCondition,
String segmentId, boolean isAdd) {
+ long updatedProfileCount = 0;
PartialList<Profile> profiles =
persistenceService.query(profilesToUpdateCondition, null, Profile.class, 0,
segmentUpdateBatchSize, "10m");
while (profiles != null && profiles.getList().size() > 0) {
long startTime = System.currentTimeMillis();
if (batchSegmentProfileUpdate) {
batchUpdateProfilesSegment(segmentId, profiles.getList(),
isAdd);
- }
- else { //send update profile one by one
+ } else { //send update profile one by one
for (Profile profileToUpdate : profiles.getList()) {
Map<String, Object> sourceMap =
buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
persistenceService.update(profileToUpdate, null,
Profile.class, sourceMap);
}
}
- if (sendProfileUpdateEventForSegmentUpdate)
+ if (sendProfileUpdateEventForSegmentUpdate)
sendProfileUpdatedEvent(profiles.getList());
updatedProfileCount += profiles.size();
@@ -1128,7 +1122,7 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
private void batchUpdateProfilesSegment(String segmentId, List<Profile>
profiles, boolean isAdd) {
Map<Item, Map> profileToPropertiesMap = new HashMap<>();
for (Profile profileToUpdate : profiles) {
- Map<String,Object> propertiesToUpdate =
buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
+ Map<String, Object> propertiesToUpdate =
buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate);
}
List<String> failedItemsIds =
persistenceService.update(profileToPropertiesMap, null, Profile.class);
@@ -1136,8 +1130,8 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId,
isAdd));
}
- private void retryFailedSegmentUpdate(String profileId, String segmentId,
boolean isAdd){
- if (maxRetriesForUpdateProfileSegment > 0){
+ private void retryFailedSegmentUpdate(String profileId, String segmentId,
boolean isAdd) {
+ if (maxRetriesForUpdateProfileSegment > 0) {
RetryPolicy retryPolicy = new RetryPolicy()
.withDelay(Duration.ofSeconds(secondsDelayForRetryUpdateProfileSegment))
.withMaxRetries(maxRetriesForUpdateProfileSegment);
@@ -1161,9 +1155,9 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
}
private void sendProfileUpdatedEvent(Profile profile) {
- Event profileUpdated = new Event("profileUpdated", null, profile,
null, null, profile, new Date());
- profileUpdated.setPersistent(false);
- eventService.send(profileUpdated);
+ Event profileUpdated = new Event("profileUpdated", null, profile,
null, null, profile, new Date());
+ profileUpdated.setPersistent(false);
+ eventService.send(profileUpdated);
}
private Map<String, Object> buildPropertiesMapForUpdateSegment(Profile
profile, String segmentId, boolean isAdd) {
@@ -1267,7 +1261,7 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
long initialDelay =
SchedulerServiceImpl.getTimeDiffInSeconds(dailyDateExprEvaluationHourUtc,
ZonedDateTime.now(ZoneOffset.UTC));
logger.info("daily DateExpr segments will run at fixed rate,
initialDelay={}, taskExecutionPeriod={}, ", initialDelay,
TimeUnit.DAYS.toSeconds(1));
-
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task,
initialDelay, TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS);
+
schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task,
initialDelay, TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS);
}
private void loadScripts() throws IOException {