This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch UNOMI-727-adapt-merge-rollover
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/UNOMI-727-adapt-merge-rollover 
by this push:
     new 1cfaf0030 UNOMI-727: adapt merge system to rollover (and cleanup too)
1cfaf0030 is described below

commit 1cfaf00305f295f7eeefa85efef58c9a1b9092d5
Author: Kevan <[email protected]>
AuthorDate: Fri Mar 10 17:02:08 2023 +0100

    UNOMI-727: adapt merge system to rollover (and cleanup too)
---
 .../org/apache/unomi/itests/ProfileMergeIT.java    | 216 ++++++++++++++++++---
 .../actions/MergeProfilesOnPropertyAction.java     |  24 +--
 .../services/impl/segments/SegmentServiceImpl.java |   4 +-
 3 files changed, 198 insertions(+), 46 deletions(-)

diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
index a5306d9f9..c54d402e9 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
@@ -16,10 +16,7 @@
  */
 package org.apache.unomi.itests;
 
-import org.apache.unomi.api.Event;
-import org.apache.unomi.api.Metadata;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.ProfileAlias;
+import org.apache.unomi.api.*;
 import org.apache.unomi.api.actions.Action;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.rules.Rule;
@@ -49,14 +46,15 @@ public class ProfileMergeIT extends BaseIT {
     private final static String TEST_PROFILE_ID = 
"mergeOnPropertyTestProfileId";
 
     @After
-    public void after() {
+    public void after() throws InterruptedException {
         // cleanup created data
         rulesService.removeRule(TEST_RULE_ID);
+        removeItems(Profile.class, ProfileAlias.class, Event.class, 
Session.class);
     }
 
     @Test
     public void 
testProfileMergeOnPropertyAction_dont_forceEventProfileAsMaster() throws 
InterruptedException {
-        createAndWaitForRule(createMergeOnPropertyRule(false));
+        createAndWaitForRule(createMergeOnPropertyRule(false, "j:nodename"));
 
         // A new profile should be created.
         Assert.assertNotEquals(sendEvent().getProfile().getItemId(), 
TEST_PROFILE_ID);
@@ -64,29 +62,16 @@ public class ProfileMergeIT extends BaseIT {
 
     @Test
     public void testProfileMergeOnPropertyAction_forceEventProfileAsMaster() 
throws InterruptedException {
-        createAndWaitForRule(createMergeOnPropertyRule(true));
+        createAndWaitForRule(createMergeOnPropertyRule(true, "j:nodename"));
 
         // No new profile should be created, instead the profile of the event 
should be used.
         Assert.assertEquals(sendEvent().getProfile().getItemId(), 
TEST_PROFILE_ID);
     }
 
     @Test
-    public void test() throws InterruptedException {
+    public void testProfileMergeOnPropertyAction_simpleMergeAndCheckAlias() 
throws InterruptedException {
         // create rule
-        Condition condition = new 
Condition(definitionsService.getConditionType("eventTypeCondition"));
-        condition.setParameter("eventTypeId", TEST_EVENT_TYPE);
-
-        final Action action = new 
Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
-        action.setParameter("mergeProfilePropertyValue", 
"eventProperty::target.properties(email)");
-        action.setParameter("mergeProfilePropertyName", "mergeIdentifier");
-        action.setParameter("forceEventProfileAsMaster", false);
-
-        Rule rule = new Rule();
-        rule.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, 
"Description"));
-        rule.setCondition(condition);
-        rule.setActions(Collections.singletonList(action));
-
-        createAndWaitForRule(rule);
+        createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
 
         // create master profile
         Profile masterProfile = new Profile();
@@ -115,6 +100,14 @@ public class ProfileMergeIT extends BaseIT {
                 () -> persistenceService.getAllItems(ProfileAlias.class), 
(profileAliases) -> !profileAliases.isEmpty(),
                 DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
 
+        waitForNullValue("Profile with id eventProfileID not removed in the 
required time",
+                () -> persistenceService.load("eventProfileID", Profile.class),
+                DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+        keepTrying("Profile with id eventProfileID should still be accessible 
due to alias",
+                () -> profileService.load("eventProfileID"), Objects::nonNull,
+                DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
         List<ProfileAlias> aliases = persistenceService.query("profileID", 
masterProfile.getItemId(), null, ProfileAlias.class);
 
         Assert.assertFalse(aliases.isEmpty());
@@ -123,6 +116,181 @@ public class ProfileMergeIT extends BaseIT {
         Assert.assertEquals("defaultClientId", aliases.get(0).getClientID());
     }
 
+
+    /**
+     * User switch case, this case can happen when a person (user A) is using 
the same browser session of a previous logged user (user B).
+     * user A will be using user B profile, but when user A is going to login 
by send a merge event, then we will detect that the mergeIdentifier is not the 
same
+     * In this case we will just switch user A profile to:
+     * - a new one, if it's the first time we encounter his own 
mergeIdentifier (TESTED in this scenario)
+     * - a previous one, if we already have a profile in DB with the same 
mergeIdentifier.
+     */
+    @Test
+    public void 
testProfileMergeOnPropertyAction_sessionReassigned_newProfile() throws 
InterruptedException {
+        // create rule
+        createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+        // create master profile
+        Profile masterProfile = new Profile();
+        masterProfile.setItemId("masterProfileID");
+        masterProfile.setProperty("email", "[email protected]");
+        masterProfile.setSystemProperty("mergeIdentifier", 
"[email protected]");
+        profileService.save(masterProfile);
+
+        keepTrying("Profile with id masterProfileID not found in the required 
time", () -> profileService.load("masterProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, 
DEFAULT_TRYING_TRIES);
+
+        // create event profile
+        Profile eventProfile = new Profile();
+        eventProfile.setItemId("eventProfileID");
+        eventProfile.setProperty("email", "[email protected]");
+
+        Session simpleSession = new Session("simpleSession", eventProfile, new 
Date(), null);
+        Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, 
null, null, eventProfile, new Date());
+        eventService.send(event);
+
+        // Session should have been reassign and a new profile should have 
been created ! (We call this user switch case)
+        Assert.assertNotNull(event.getProfile());
+        Assert.assertNotEquals("eventProfileID", 
event.getProfile().getItemId());
+        Assert.assertNotEquals("eventProfileID", event.getProfileId());
+        Assert.assertNotEquals("eventProfileID", 
event.getSession().getProfile().getItemId());
+        Assert.assertNotEquals("eventProfileID", 
event.getSession().getProfileId());
+
+        Assert.assertNotEquals("masterProfileID", 
event.getProfile().getItemId());
+        Assert.assertNotEquals("masterProfileID", event.getProfileId());
+        Assert.assertNotEquals("masterProfileID", 
event.getSession().getProfile().getItemId());
+        Assert.assertNotEquals("masterProfileID", 
event.getSession().getProfileId());
+
+        Assert.assertEquals(event.getSession().getProfileId(), 
event.getProfileId());
+        Assert.assertEquals("[email protected]", 
event.getProfile().getSystemProperties().get("mergeIdentifier"));
+    }
+
+    /**
+     * User switch case, this case can happen when a person (user A) is using 
the same browser session of a previous logged user (user B).
+     * user A will be using user B profile, but when user A is going to login 
by send a merge event, then we will detect that the mergeIdentifier is not the 
same
+     * In this case we will just switch user A profile to:
+     * - a new one, if it's the first time we encounter his own mergeIdentifier
+     * - a previous one, if we already have a profile in DB with the same 
mergeIdentifier. (TESTED in this scenario)
+     */
+    @Test
+    public void 
testProfileMergeOnPropertyAction_sessionReassigned_existingProfile() throws 
InterruptedException {
+        // create rule
+        createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+        // create master profile
+        Profile masterProfile = new Profile();
+        masterProfile.setItemId("masterProfileID");
+        masterProfile.setProperty("email", "[email protected]");
+        masterProfile.setSystemProperty("mergeIdentifier", 
"[email protected]");
+        profileService.save(masterProfile);
+
+        // create a previous existing profile with same mergeIdentifier
+        Profile previousProfile = new Profile();
+        previousProfile.setItemId("previousProfileID");
+        previousProfile.setProperty("email", "[email protected]");
+        previousProfile.setSystemProperty("mergeIdentifier", 
"[email protected]");
+        profileService.save(previousProfile);
+
+        keepTrying("Profile with id masterProfileID not found in the required 
time", () -> profileService.load("masterProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, 
DEFAULT_TRYING_TRIES);
+        keepTrying("Profile with id previousProfileID not found in the 
required time", () -> profileService.load("previousProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, 
DEFAULT_TRYING_TRIES);
+
+        // create event profile
+        Profile eventProfile = new Profile();
+        eventProfile.setItemId("eventProfileID");
+        eventProfile.setProperty("email", "[email protected]");
+
+        Session simpleSession = new Session("simpleSession", eventProfile, new 
Date(), null);
+        Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile, 
null, null, eventProfile, new Date());
+        eventService.send(event);
+
+        // Session should have been reassign and the previous existing profile 
for mergeIdentifier: [email protected] should have been reuse
+        // Session should have been reassign and a new profile should have 
been created ! (We call this user switch case)
+        Assert.assertNotNull(event.getProfile());
+        Assert.assertEquals("previousProfileID", 
event.getProfile().getItemId());
+        Assert.assertEquals("previousProfileID", event.getProfileId());
+        Assert.assertEquals("previousProfileID", 
event.getSession().getProfile().getItemId());
+        Assert.assertEquals("previousProfileID", 
event.getSession().getProfileId());
+
+        Assert.assertEquals(event.getSession().getProfileId(), 
event.getProfileId());
+        Assert.assertEquals("[email protected]", 
event.getProfile().getSystemProperties().get("mergeIdentifier"));
+    }
+
+    /**
+     * In case of merge, existing sessions/events from previous profileId 
should be rewritten to use the new master profileId
+     */
+    @Test
+    public void 
testProfileMergeOnPropertyAction_simpleMergeRewriteExistingSessionsEvents() 
throws InterruptedException {
+        Condition matchAll = new 
Condition(definitionsService.getConditionType("matchAllCondition"));
+        // create rule
+        createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+        // create master profile
+        Profile masterProfile = new Profile();
+        masterProfile.setItemId("masterProfileID");
+        masterProfile.setProperty("email", "[email protected]");
+        masterProfile.setSystemProperty("mergeIdentifier", 
"[email protected]");
+        profileService.save(masterProfile);
+
+        Profile eventProfile = new Profile();
+        eventProfile.setItemId("eventProfileID");
+        eventProfile.setProperty("email", "[email protected]");
+        profileService.save(eventProfile);
+
+        // create 5 sessions and 5 events for master profile.
+        List<Session> sessionsToBeRewritten = new ArrayList<>();
+        List<Event> eventsToBeRewritten = new ArrayList<>();
+        for (int i = 1; i <= 5; i++) {
+            Session sessionToBeRewritten = new Session("simpleSession_"+ i, 
eventProfile, new Date(), null);
+            sessionsToBeRewritten.add(sessionToBeRewritten);
+            Event eventToBeRewritten = new Event("view", sessionToBeRewritten, 
eventProfile, null, null, null, new Date());
+            eventsToBeRewritten.add(eventToBeRewritten);
+
+
+            persistenceService.save(sessionToBeRewritten);
+            persistenceService.save(eventToBeRewritten);
+        }
+        keepTrying("Wait for sessions and events to be persisted", () -> 
persistenceService.queryCount(matchAll, Session.ITEM_TYPE) + 
persistenceService.queryCount(matchAll, Event.ITEM_TYPE),
+                (count) -> count == 10, DEFAULT_TRYING_TIMEOUT, 
DEFAULT_TRYING_TRIES);
+        keepTrying("Profile with id masterProfileID not found in the required 
time", () -> profileService.load("masterProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, 
DEFAULT_TRYING_TRIES);
+        keepTrying("Profile with id eventProfileID not found in the required 
time", () -> profileService.load("eventProfileID"),
+                Objects::nonNull, DEFAULT_TRYING_TIMEOUT, 
DEFAULT_TRYING_TRIES);
+
+        // Trigger the merge
+        Session simpleSession = new Session("simpleSession", eventProfile, new 
Date(), null);
+        Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession, 
eventProfile, null, null, eventProfile, new Date());
+        eventService.send(mergeEvent);
+
+        // Check that master profile is now used:
+        Assert.assertNotNull(mergeEvent.getProfile());
+        Assert.assertEquals("masterProfileID", 
mergeEvent.getProfile().getItemId());
+        Assert.assertEquals("masterProfileID", mergeEvent.getProfileId());
+        Assert.assertEquals("masterProfileID", 
mergeEvent.getSession().getProfile().getItemId());
+        Assert.assertEquals("masterProfileID", 
mergeEvent.getSession().getProfileId());
+        Assert.assertEquals(mergeEvent.getSession().getProfileId(), 
mergeEvent.getProfileId());
+        Assert.assertEquals("[email protected]", 
mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier"));
+
+        // TODO (UNOMI-748): force the bulk processor to push requests
+        persistenceService.refresh();
+
+        // Check sessions/events are correctly rewritten
+        for (Event event : eventsToBeRewritten) {
+            keepTrying("Wait for event: " + event.getItemId() + " profileId to 
be rewritten for masterProfileID",
+                    () -> persistenceService.load(event.getItemId(), 
Event.class),
+                    (loadedEvent) -> 
loadedEvent.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, 
DEFAULT_TRYING_TRIES);
+        }
+        for (Session session : sessionsToBeRewritten) {
+            keepTrying("Wait for session: " + session.getItemId() + " 
profileId to be rewritten for masterProfileID",
+                    () -> persistenceService.load(session.getItemId(), 
Session.class),
+                    (loadedSession) -> 
loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, 
DEFAULT_TRYING_TRIES);
+        }
+    }
+
+    /**
+     * Personalization strategy have a specific handling during the merge of 
two profiles
+     * This test is here to ensure this specific behavior is correctly working.
+     */
     @Test
     public void testPersonalizationStrategyStatusMerge() {
         // create some statuses for the tests:
@@ -208,7 +376,7 @@ public class ProfileMergeIT extends BaseIT {
         return testEvent;
     }
 
-    private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster) 
throws InterruptedException {
+    private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster, 
String eventProperty) throws InterruptedException {
         Rule mergeOnPropertyTestRule = new Rule();
         mergeOnPropertyTestRule
                 .setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID, 
"Test rule for testing MergeProfilesOnPropertyAction"));
@@ -218,7 +386,7 @@ public class ProfileMergeIT extends BaseIT {
         mergeOnPropertyTestRule.setCondition(condition);
 
         final Action mergeProfilesOnPropertyAction = new 
Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
-        
mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", 
"eventProperty::target.properties(j:nodename)");
+        
mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue", 
"eventProperty::target.properties(" + eventProperty + ")");
         mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyName", 
"mergeIdentifier");
         
mergeProfilesOnPropertyAction.setParameter("forceEventProfileAsMaster", 
forceEventProfileAsMaster);
         
mergeOnPropertyTestRule.setActions(Collections.singletonList(mergeProfilesOnPropertyAction));
diff --git 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 49d9c7cc7..d4404e7d9 100644
--- 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -30,9 +30,7 @@ import org.apache.unomi.persistence.spi.PersistenceService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 public class MergeProfilesOnPropertyAction implements ActionExecutor {
@@ -119,7 +117,10 @@ public class MergeProfilesOnPropertyAction implements 
ActionExecutor {
                     String profileToBeMergeId = profileToBeMerge.getItemId();
                     if (!StringUtils.equals(profileToBeMergeId, 
mergedProfileId)) {
 
-                        // todo move in update by query + script
+                        // TODO (UNOMI-748): the following updates are 
asynchron due to usage of bulk processor in ElasticSearch persistence service 
update function.
+                        //  We could consider replacing those updates(one item 
at a time) by updateByQueryAndScript to avoid loading all the sessions/events 
in memory,
+                        //  but we would loose the asynchronous nature (By 
doing that request may take longer than before,
+                        //  and could potentially break client side timeouts 
on requests)
                         List<Session> oldSessions = 
persistenceService.query("profileId", profileToBeMergeId, null, Session.class);
                         for (Session oldSession : oldSessions) {
                             if 
(!oldSession.getItemId().equals(event.getSession().getItemId())) {
@@ -127,7 +128,6 @@ public class MergeProfilesOnPropertyAction implements 
ActionExecutor {
                             }
                         }
 
-                        // todo move in update by query + script
                         List<Event> oldEvents = 
persistenceService.query("profileId", profileToBeMergeId, null, Event.class);
                         for (Event oldEvent : oldEvents) {
                             if 
(!oldEvent.getItemId().equals(event.getItemId())) {
@@ -190,22 +190,6 @@ public class MergeProfilesOnPropertyAction implements 
ActionExecutor {
         }
     }
 
-    private void updateAllSessionsForProfile(String newProfileId, String 
oldProfileId) {
-        String[] scripts = new String[]{"updateProfileId"};
-
-        Map<String, Object>[] scriptParams = new HashMap[1];
-        scriptParams[0] = new HashMap<>();
-        scriptParams[0].put("profileId", newProfileId);
-
-        Condition[] conditions = new Condition[1];
-        conditions[0] = new Condition();
-        
conditions[0].setConditionType(definitionsService.getConditionType("sessionPropertyCondition"));
-        conditions[0].setParameter("propertyName", "profileId");
-        conditions[0].setParameter("comparisonOperator", "equals");
-        conditions[0].setParameter("propertyValue", oldProfileId);
-        persistenceService.updateWithQueryAndStoredScript(Session.class, 
scripts, scriptParams, conditions);
-    }
-
     public void setProfileService(ProfileService profileService) {
         this.profileService = profileService;
     }
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 1b64cbdc5..36038fd76 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -1016,7 +1016,7 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
 
             if (batch.size() == segmentUpdateBatchSize || 
(!entryIterator.hasNext() && batch.size() > 0)) {
                 try {
-                    persistenceService.update(batch, null, Profile.class);
+                    persistenceService.update(batch, Profile.class);
                 } catch (Exception e) {
                     logger.error("Error updating {} profiles for past event 
system properties", batch.size(), e);
                 } finally {
@@ -1115,7 +1115,7 @@ public class SegmentServiceImpl extends 
AbstractServiceImpl implements SegmentSe
             Map<String, Object> propertiesToUpdate = 
buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
             profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate);
         }
-        List<String> failedItemsIds = 
persistenceService.update(profileToPropertiesMap, null, Profile.class);
+        List<String> failedItemsIds = 
persistenceService.update(profileToPropertiesMap, Profile.class);
         if (failedItemsIds != null)
             failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId, 
isAdd));
     }

Reply via email to