This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch UNOMI-727-adapt-merge-rollover
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/UNOMI-727-adapt-merge-rollover
by this push:
new 1cfaf0030 UNOMI-727: adapt merge system to rollover (and cleanup too)
1cfaf0030 is described below
commit 1cfaf00305f295f7eeefa85efef58c9a1b9092d5
Author: Kevan <[email protected]>
AuthorDate: Fri Mar 10 17:02:08 2023 +0100
UNOMI-727: adapt merge system to rollover (and cleanup too)
---
.../org/apache/unomi/itests/ProfileMergeIT.java | 216 ++++++++++++++++++---
.../actions/MergeProfilesOnPropertyAction.java | 24 +--
.../services/impl/segments/SegmentServiceImpl.java | 4 +-
3 files changed, 198 insertions(+), 46 deletions(-)
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
index a5306d9f9..c54d402e9 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
@@ -16,10 +16,7 @@
*/
package org.apache.unomi.itests;
-import org.apache.unomi.api.Event;
-import org.apache.unomi.api.Metadata;
-import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.ProfileAlias;
+import org.apache.unomi.api.*;
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.rules.Rule;
@@ -49,14 +46,15 @@ public class ProfileMergeIT extends BaseIT {
private final static String TEST_PROFILE_ID =
"mergeOnPropertyTestProfileId";
@After
- public void after() {
+ public void after() throws InterruptedException {
// cleanup created data
rulesService.removeRule(TEST_RULE_ID);
+ removeItems(Profile.class, ProfileAlias.class, Event.class,
Session.class);
}
@Test
public void
testProfileMergeOnPropertyAction_dont_forceEventProfileAsMaster() throws
InterruptedException {
- createAndWaitForRule(createMergeOnPropertyRule(false));
+ createAndWaitForRule(createMergeOnPropertyRule(false, "j:nodename"));
// A new profile should be created.
Assert.assertNotEquals(sendEvent().getProfile().getItemId(),
TEST_PROFILE_ID);
@@ -64,29 +62,16 @@ public class ProfileMergeIT extends BaseIT {
@Test
public void testProfileMergeOnPropertyAction_forceEventProfileAsMaster()
throws InterruptedException {
- createAndWaitForRule(createMergeOnPropertyRule(true));
+ createAndWaitForRule(createMergeOnPropertyRule(true, "j:nodename"));
// No new profile should be created, instead the profile of the event
should be used.
Assert.assertEquals(sendEvent().getProfile().getItemId(),
TEST_PROFILE_ID);
}
@Test
- public void test() throws InterruptedException {
+ public void testProfileMergeOnPropertyAction_simpleMergeAndCheckAlias()
throws InterruptedException {
// create rule
- Condition condition = new
Condition(definitionsService.getConditionType("eventTypeCondition"));
- condition.setParameter("eventTypeId", TEST_EVENT_TYPE);
-
- final Action action = new
Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
- action.setParameter("mergeProfilePropertyValue",
"eventProperty::target.properties(email)");
- action.setParameter("mergeProfilePropertyName", "mergeIdentifier");
- action.setParameter("forceEventProfileAsMaster", false);
-
- Rule rule = new Rule();
- rule.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID,
"Description"));
- rule.setCondition(condition);
- rule.setActions(Collections.singletonList(action));
-
- createAndWaitForRule(rule);
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
// create master profile
Profile masterProfile = new Profile();
@@ -115,6 +100,14 @@ public class ProfileMergeIT extends BaseIT {
() -> persistenceService.getAllItems(ProfileAlias.class),
(profileAliases) -> !profileAliases.isEmpty(),
DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+ waitForNullValue("Profile with id eventProfileID not removed in the
required time",
+ () -> persistenceService.load("eventProfileID", Profile.class),
+ DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
+ keepTrying("Profile with id eventProfileID should still be accessible
due to alias",
+ () -> profileService.load("eventProfileID"), Objects::nonNull,
+ DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
+
List<ProfileAlias> aliases = persistenceService.query("profileID",
masterProfile.getItemId(), null, ProfileAlias.class);
Assert.assertFalse(aliases.isEmpty());
@@ -123,6 +116,181 @@ public class ProfileMergeIT extends BaseIT {
Assert.assertEquals("defaultClientId", aliases.get(0).getClientID());
}
+
+ /**
+ * User switch case, this case can happen when a person (user A) is using
the same browser session of a previous logged user (user B).
+ * user A will be using user B profile, but when user A is going to login
by send a merge event, then we will detect that the mergeIdentifier is not the
same
+ * In this case we will just switch user A profile to:
+ * - a new one, if it's the first time we encounter his own
mergeIdentifier (TESTED in this scenario)
+ * - a previous one, if we already have a profile in DB with the same
mergeIdentifier.
+ */
+ @Test
+ public void
testProfileMergeOnPropertyAction_sessionReassigned_newProfile() throws
InterruptedException {
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "[email protected]");
+ masterProfile.setSystemProperty("mergeIdentifier",
"[email protected]");
+ profileService.save(masterProfile);
+
+ keepTrying("Profile with id masterProfileID not found in the required
time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+
+ // create event profile
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "[email protected]");
+
+ Session simpleSession = new Session("simpleSession", eventProfile, new
Date(), null);
+ Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile,
null, null, eventProfile, new Date());
+ eventService.send(event);
+
+ // Session should have been reassign and a new profile should have
been created ! (We call this user switch case)
+ Assert.assertNotNull(event.getProfile());
+ Assert.assertNotEquals("eventProfileID",
event.getProfile().getItemId());
+ Assert.assertNotEquals("eventProfileID", event.getProfileId());
+ Assert.assertNotEquals("eventProfileID",
event.getSession().getProfile().getItemId());
+ Assert.assertNotEquals("eventProfileID",
event.getSession().getProfileId());
+
+ Assert.assertNotEquals("masterProfileID",
event.getProfile().getItemId());
+ Assert.assertNotEquals("masterProfileID", event.getProfileId());
+ Assert.assertNotEquals("masterProfileID",
event.getSession().getProfile().getItemId());
+ Assert.assertNotEquals("masterProfileID",
event.getSession().getProfileId());
+
+ Assert.assertEquals(event.getSession().getProfileId(),
event.getProfileId());
+ Assert.assertEquals("[email protected]",
event.getProfile().getSystemProperties().get("mergeIdentifier"));
+ }
+
+ /**
+ * User switch case, this case can happen when a person (user A) is using
the same browser session of a previous logged user (user B).
+ * user A will be using user B profile, but when user A is going to login
by send a merge event, then we will detect that the mergeIdentifier is not the
same
+ * In this case we will just switch user A profile to:
+ * - a new one, if it's the first time we encounter his own mergeIdentifier
+ * - a previous one, if we already have a profile in DB with the same
mergeIdentifier. (TESTED in this scenario)
+ */
+ @Test
+ public void
testProfileMergeOnPropertyAction_sessionReassigned_existingProfile() throws
InterruptedException {
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "[email protected]");
+ masterProfile.setSystemProperty("mergeIdentifier",
"[email protected]");
+ profileService.save(masterProfile);
+
+ // create a previous existing profile with same mergeIdentifier
+ Profile previousProfile = new Profile();
+ previousProfile.setItemId("previousProfileID");
+ previousProfile.setProperty("email", "[email protected]");
+ previousProfile.setSystemProperty("mergeIdentifier",
"[email protected]");
+ profileService.save(previousProfile);
+
+ keepTrying("Profile with id masterProfileID not found in the required
time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id previousProfileID not found in the
required time", () -> profileService.load("previousProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+
+ // create event profile
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "[email protected]");
+
+ Session simpleSession = new Session("simpleSession", eventProfile, new
Date(), null);
+ Event event = new Event(TEST_EVENT_TYPE, simpleSession, masterProfile,
null, null, eventProfile, new Date());
+ eventService.send(event);
+
+ // Session should have been reassign and the previous existing profile
for mergeIdentifier: [email protected] should have been reuse
+ // Session should have been reassign and a new profile should have
been created ! (We call this user switch case)
+ Assert.assertNotNull(event.getProfile());
+ Assert.assertEquals("previousProfileID",
event.getProfile().getItemId());
+ Assert.assertEquals("previousProfileID", event.getProfileId());
+ Assert.assertEquals("previousProfileID",
event.getSession().getProfile().getItemId());
+ Assert.assertEquals("previousProfileID",
event.getSession().getProfileId());
+
+ Assert.assertEquals(event.getSession().getProfileId(),
event.getProfileId());
+ Assert.assertEquals("[email protected]",
event.getProfile().getSystemProperties().get("mergeIdentifier"));
+ }
+
+ /**
+ * In case of merge, existing sessions/events from previous profileId
should be rewritten to use the new master profileId
+ */
+ @Test
+ public void
testProfileMergeOnPropertyAction_simpleMergeRewriteExistingSessionsEvents()
throws InterruptedException {
+ Condition matchAll = new
Condition(definitionsService.getConditionType("matchAllCondition"));
+ // create rule
+ createAndWaitForRule(createMergeOnPropertyRule(false, "email"));
+
+ // create master profile
+ Profile masterProfile = new Profile();
+ masterProfile.setItemId("masterProfileID");
+ masterProfile.setProperty("email", "[email protected]");
+ masterProfile.setSystemProperty("mergeIdentifier",
"[email protected]");
+ profileService.save(masterProfile);
+
+ Profile eventProfile = new Profile();
+ eventProfile.setItemId("eventProfileID");
+ eventProfile.setProperty("email", "[email protected]");
+ profileService.save(eventProfile);
+
+ // create 5 sessions and 5 events for master profile.
+ List<Session> sessionsToBeRewritten = new ArrayList<>();
+ List<Event> eventsToBeRewritten = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ Session sessionToBeRewritten = new Session("simpleSession_"+ i,
eventProfile, new Date(), null);
+ sessionsToBeRewritten.add(sessionToBeRewritten);
+ Event eventToBeRewritten = new Event("view", sessionToBeRewritten,
eventProfile, null, null, null, new Date());
+ eventsToBeRewritten.add(eventToBeRewritten);
+
+
+ persistenceService.save(sessionToBeRewritten);
+ persistenceService.save(eventToBeRewritten);
+ }
+ keepTrying("Wait for sessions and events to be persisted", () ->
persistenceService.queryCount(matchAll, Session.ITEM_TYPE) +
persistenceService.queryCount(matchAll, Event.ITEM_TYPE),
+ (count) -> count == 10, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id masterProfileID not found in the required
time", () -> profileService.load("masterProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ keepTrying("Profile with id eventProfileID not found in the required
time", () -> profileService.load("eventProfileID"),
+ Objects::nonNull, DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+
+ // Trigger the merge
+ Session simpleSession = new Session("simpleSession", eventProfile, new
Date(), null);
+ Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession,
eventProfile, null, null, eventProfile, new Date());
+ eventService.send(mergeEvent);
+
+ // Check that master profile is now used:
+ Assert.assertNotNull(mergeEvent.getProfile());
+ Assert.assertEquals("masterProfileID",
mergeEvent.getProfile().getItemId());
+ Assert.assertEquals("masterProfileID", mergeEvent.getProfileId());
+ Assert.assertEquals("masterProfileID",
mergeEvent.getSession().getProfile().getItemId());
+ Assert.assertEquals("masterProfileID",
mergeEvent.getSession().getProfileId());
+ Assert.assertEquals(mergeEvent.getSession().getProfileId(),
mergeEvent.getProfileId());
+ Assert.assertEquals("[email protected]",
mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier"));
+
+ // TODO (UNOMI-748): force the bulk processor to push requests
+ persistenceService.refresh();
+
+ // Check sessions/events are correctly rewritten
+ for (Event event : eventsToBeRewritten) {
+ keepTrying("Wait for event: " + event.getItemId() + " profileId to
be rewritten for masterProfileID",
+ () -> persistenceService.load(event.getItemId(),
Event.class),
+ (loadedEvent) ->
loadedEvent.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ }
+ for (Session session : sessionsToBeRewritten) {
+ keepTrying("Wait for session: " + session.getItemId() + "
profileId to be rewritten for masterProfileID",
+ () -> persistenceService.load(session.getItemId(),
Session.class),
+ (loadedSession) ->
loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT,
DEFAULT_TRYING_TRIES);
+ }
+ }
+
+ /**
+ * Personalization strategy have a specific handling during the merge of
two profiles
+ * This test is here to ensure this specific behavior is correctly working.
+ */
@Test
public void testPersonalizationStrategyStatusMerge() {
// create some statuses for the tests:
@@ -208,7 +376,7 @@ public class ProfileMergeIT extends BaseIT {
return testEvent;
}
- private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster)
throws InterruptedException {
+ private Rule createMergeOnPropertyRule(boolean forceEventProfileAsMaster,
String eventProperty) throws InterruptedException {
Rule mergeOnPropertyTestRule = new Rule();
mergeOnPropertyTestRule
.setMetadata(new Metadata(null, TEST_RULE_ID, TEST_RULE_ID,
"Test rule for testing MergeProfilesOnPropertyAction"));
@@ -218,7 +386,7 @@ public class ProfileMergeIT extends BaseIT {
mergeOnPropertyTestRule.setCondition(condition);
final Action mergeProfilesOnPropertyAction = new
Action(definitionsService.getActionType("mergeProfilesOnPropertyAction"));
-
mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue",
"eventProperty::target.properties(j:nodename)");
+
mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyValue",
"eventProperty::target.properties(" + eventProperty + ")");
mergeProfilesOnPropertyAction.setParameter("mergeProfilePropertyName",
"mergeIdentifier");
mergeProfilesOnPropertyAction.setParameter("forceEventProfileAsMaster",
forceEventProfileAsMaster);
mergeOnPropertyTestRule.setActions(Collections.singletonList(mergeProfilesOnPropertyAction));
diff --git
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 49d9c7cc7..d4404e7d9 100644
---
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -30,9 +30,7 @@ import org.apache.unomi.persistence.spi.PersistenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
public class MergeProfilesOnPropertyAction implements ActionExecutor {
@@ -119,7 +117,10 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
String profileToBeMergeId = profileToBeMerge.getItemId();
if (!StringUtils.equals(profileToBeMergeId,
mergedProfileId)) {
- // todo move in update by query + script
+ // TODO (UNOMI-748): the following updates are
asynchron due to usage of bulk processor in ElasticSearch persistence service
update function.
+ // We could consider replacing those updates(one item
at a time) by updateByQueryAndScript to avoid loading all the sessions/events
in memory,
+ // but we would loose the asynchronous nature (By
doing that request may take longer than before,
+ // and could potentially break client side timeouts
on requests)
List<Session> oldSessions =
persistenceService.query("profileId", profileToBeMergeId, null, Session.class);
for (Session oldSession : oldSessions) {
if
(!oldSession.getItemId().equals(event.getSession().getItemId())) {
@@ -127,7 +128,6 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
}
}
- // todo move in update by query + script
List<Event> oldEvents =
persistenceService.query("profileId", profileToBeMergeId, null, Event.class);
for (Event oldEvent : oldEvents) {
if
(!oldEvent.getItemId().equals(event.getItemId())) {
@@ -190,22 +190,6 @@ public class MergeProfilesOnPropertyAction implements
ActionExecutor {
}
}
- private void updateAllSessionsForProfile(String newProfileId, String
oldProfileId) {
- String[] scripts = new String[]{"updateProfileId"};
-
- Map<String, Object>[] scriptParams = new HashMap[1];
- scriptParams[0] = new HashMap<>();
- scriptParams[0].put("profileId", newProfileId);
-
- Condition[] conditions = new Condition[1];
- conditions[0] = new Condition();
-
conditions[0].setConditionType(definitionsService.getConditionType("sessionPropertyCondition"));
- conditions[0].setParameter("propertyName", "profileId");
- conditions[0].setParameter("comparisonOperator", "equals");
- conditions[0].setParameter("propertyValue", oldProfileId);
- persistenceService.updateWithQueryAndStoredScript(Session.class,
scripts, scriptParams, conditions);
- }
-
public void setProfileService(ProfileService profileService) {
this.profileService = profileService;
}
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 1b64cbdc5..36038fd76 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -1016,7 +1016,7 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
if (batch.size() == segmentUpdateBatchSize ||
(!entryIterator.hasNext() && batch.size() > 0)) {
try {
- persistenceService.update(batch, null, Profile.class);
+ persistenceService.update(batch, Profile.class);
} catch (Exception e) {
logger.error("Error updating {} profiles for past event
system properties", batch.size(), e);
} finally {
@@ -1115,7 +1115,7 @@ public class SegmentServiceImpl extends
AbstractServiceImpl implements SegmentSe
Map<String, Object> propertiesToUpdate =
buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate);
}
- List<String> failedItemsIds =
persistenceService.update(profileToPropertiesMap, null, Profile.class);
+ List<String> failedItemsIds =
persistenceService.update(profileToPropertiesMap, Profile.class);
if (failedItemsIds != null)
failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId,
isAdd));
}