This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push: new ba58fafde UNOMI-748, UNOMI-749: more improvements for merge system: updateWithS… (#593) ba58fafde is described below commit ba58fafde122ac82d9764329edc620fda2b936bc Author: kevan Jahanshahi <jke...@apache.org> AuthorDate: Thu Mar 23 09:33:11 2023 +0100 UNOMI-748, UNOMI-749: more improvements for merge system: updateWithS… (#593) * UNOMI-748, UNOMI-749: more improvements for merge system: updateWithScript to reassign past sessions/events, fix inconstency in sessions profileId, use schedulerService to support asynchronous update of past browsing data. * UNOMI-748, UNOMI-749: add more tests --- .../unomi/privacy/internal/PrivacyServiceImpl.java | 5 - .../test/java/org/apache/unomi/itests/BaseIT.java | 2 + .../org/apache/unomi/itests/ProfileMergeIT.java | 100 ++++++++++++++++++-- persistence-elasticsearch/core/pom.xml | 9 +- .../ElasticSearchPersistenceServiceImpl.java | 30 ++++++ .../actions/MergeProfilesOnPropertyAction.java | 105 ++++++++++++--------- .../META-INF/cxs/painless/updateProfileId.painless | 32 +++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 2 + services/pom.xml | 4 - .../services/impl/segments/SegmentServiceImpl.java | 25 ----- 10 files changed, 224 insertions(+), 90 deletions(-) diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java index fb357d559..5d6abb828 100644 --- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java +++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java @@ -127,11 +127,6 @@ public class PrivacyServiceImpl implements PrivacyService { @Override public Boolean anonymizeBrowsingData(String profileId) { - Profile profile = profileService.load(profileId); - if (profile == null) { - return false; - } - List<Session> sessions = profileService.getProfileSessions(profileId, null, 0, -1, null).getList(); if (sessions.isEmpty()) { return false; diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java index 8c166b33c..f62d32d8a 100644 --- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java @@ -140,6 +140,7 @@ public abstract class BaseIT extends KarafTestSupport { protected RulesService rulesService; protected DefinitionsService definitionsService; protected ProfileService profileService; + protected PrivacyService privacyService; protected EventService eventService; protected BundleWatcher bundleWatcher; protected GroovyActionsService groovyActionsService; @@ -184,6 +185,7 @@ public abstract class BaseIT extends KarafTestSupport { rulesService = getOsgiService(RulesService.class, 600000); definitionsService = getOsgiService(DefinitionsService.class, 600000); profileService = getOsgiService(ProfileService.class, 600000); + privacyService = getOsgiService(PrivacyService.class, 600000); eventService = getOsgiService(EventService.class, 600000); groovyActionsService = getOsgiService(GroovyActionsService.class, 600000); segmentService = getOsgiService(SegmentService.class, 600000); diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java index 7815baccb..43c0b0429 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java @@ -220,7 +220,7 @@ public class ProfileMergeIT extends BaseIT { * In case of merge, existing sessions/events from previous profileId should be rewritten to use the new master profileId */ @Test - public void testProfileMergeOnPropertyAction_simpleMergeRewriteExistingSessionsEvents() throws InterruptedException { + public void testProfileMergeOnPropertyAction_rewriteExistingSessionsEvents() throws InterruptedException { Condition matchAll = new Condition(definitionsService.getConditionType("matchAllCondition")); // create rule createAndWaitForRule(createMergeOnPropertyRule(false, "email")); @@ -237,7 +237,7 @@ public class ProfileMergeIT extends BaseIT { eventProfile.setProperty("email", "usern...@domain.com"); profileService.save(eventProfile); - // create 5 sessions and 5 events for master profile. + // create 5 past sessions and 5 past events. List<Session> sessionsToBeRewritten = new ArrayList<>(); List<Event> eventsToBeRewritten = new ArrayList<>(); for (int i = 1; i <= 5; i++) { @@ -250,8 +250,16 @@ public class ProfileMergeIT extends BaseIT { persistenceService.save(sessionToBeRewritten); persistenceService.save(eventToBeRewritten); } - keepTrying("Wait for sessions and events to be persisted", () -> persistenceService.queryCount(matchAll, Session.ITEM_TYPE) + persistenceService.queryCount(matchAll, Event.ITEM_TYPE), - (count) -> count == 10, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + for (Session session : sessionsToBeRewritten) { + keepTrying("Wait for session: " + session.getItemId() + " to be indexed", + () -> persistenceService.query("itemId", session.getItemId(), null, Session.class), + (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + for (Event event : eventsToBeRewritten) { + keepTrying("Wait for event: " + event.getItemId() + " to be indexed", + () -> persistenceService.query("itemId", event.getItemId(), null, Event.class), + (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } keepTrying("Profile with id masterProfileID not found in the required time", () -> profileService.load("masterProfileID"), Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"), @@ -287,12 +295,90 @@ public class ProfileMergeIT extends BaseIT { () -> persistenceService.queryCount(sessionProfileIDRewrittenCondition, Session.ITEM_TYPE), (count) -> count == 5, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); - // TODO uncomment this when UNOMI-749 is fixed, currently session loaded are inconsistent - /* for (Session session : sessionsToBeRewritten) { + for (Session session : sessionsToBeRewritten) { keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for masterProfileID", () -> persistenceService.load(session.getItemId(), Session.class), (loadedSession) -> loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); - } */ + } + } + + /** + * If master profile is flagged as anonymous profile, then after the merge all past sessions/events should be anonymized + */ + @Test + public void testProfileMergeOnPropertyAction_rewriteExistingSessionsEventsAnonymous() throws InterruptedException { + Condition matchAll = new Condition(definitionsService.getConditionType("matchAllCondition")); + // create rule + createAndWaitForRule(createMergeOnPropertyRule(false, "email")); + + // create master profile + Profile masterProfile = new Profile(); + masterProfile.setItemId("masterProfileID"); + masterProfile.setProperty("email", "usern...@domain.com"); + masterProfile.setSystemProperty("mergeIdentifier", "usern...@domain.com"); + profileService.save(masterProfile); + privacyService.setRequireAnonymousBrowsing(masterProfile.getItemId(), true, null); + + Profile eventProfile = new Profile(); + eventProfile.setItemId("eventProfileID"); + eventProfile.setProperty("email", "usern...@domain.com"); + profileService.save(eventProfile); + + // create 5 sessions and 5 events for master profile. + List<Session> sessionsToBeRewritten = new ArrayList<>(); + List<Event> eventsToBeRewritten = new ArrayList<>(); + for (int i = 1; i <= 5; i++) { + Session sessionToBeRewritten = new Session("simpleSession_"+ i, eventProfile, new Date(), null); + sessionsToBeRewritten.add(sessionToBeRewritten); + Event eventToBeRewritten = new Event("view", sessionToBeRewritten, eventProfile, null, null, null, new Date()); + eventsToBeRewritten.add(eventToBeRewritten); + + persistenceService.save(sessionToBeRewritten); + persistenceService.save(eventToBeRewritten); + } + for (Session session : sessionsToBeRewritten) { + keepTrying("Wait for session: " + session.getItemId() + " to be indexed", + () -> persistenceService.query("itemId", session.getItemId(), null, Session.class), + (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + for (Event event : eventsToBeRewritten) { + keepTrying("Wait for event: " + event.getItemId() + " to be indexed", + () -> persistenceService.query("itemId", event.getItemId(), null, Event.class), + (list) -> list.size() == 1, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + keepTrying("Profile with id masterProfileID (should required anonymous browsing) not found in the required time", + () -> profileService.load("masterProfileID"), + profile -> profile != null && privacyService.isRequireAnonymousBrowsing(profile), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + keepTrying("Profile with id eventProfileID not found in the required time", () -> profileService.load("eventProfileID"), + Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + + // Trigger the merge + Session simpleSession = new Session("simpleSession", eventProfile, new Date(), null); + Event mergeEvent = new Event(TEST_EVENT_TYPE, simpleSession, eventProfile, null, null, eventProfile, new Date()); + eventService.send(mergeEvent); + + // Check that master profile is now used, but anonymous browsing is respected: + Assert.assertNotNull(mergeEvent.getProfile()); + Assert.assertEquals("masterProfileID", mergeEvent.getProfile().getItemId()); // We still have profile in the event + Assert.assertNull(mergeEvent.getProfileId()); // But profileId prop is null due to anonymous browsing + Assert.assertNull(mergeEvent.getSession().getProfile().getItemId()); // Same for the event session + Assert.assertNull(mergeEvent.getSession().getProfileId()); + Assert.assertEquals(mergeEvent.getSession().getProfileId(), mergeEvent.getProfileId()); + Assert.assertEquals("usern...@domain.com", mergeEvent.getProfile().getSystemProperties().get("mergeIdentifier")); + + // Check events are correctly rewritten (Anonymous !) + for (Event event : eventsToBeRewritten) { + keepTrying("Wait for event: " + event.getItemId() + " profileId to be rewritten for NULL due to anonymous browsing", + () -> persistenceService.load(event.getItemId(), Event.class), + (loadedEvent) -> loadedEvent.getProfileId() == null, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } + + // Check sessions are correctly rewritten (Anonymous !) + for (Session session : sessionsToBeRewritten) { + keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for NULL due to anonymous browsing", + () -> persistenceService.load(session.getItemId(), Session.class), + (loadedSession) -> loadedSession.getProfileId() == null, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); + } } /** diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index 8aa4fd7ba..770c6cea8 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -196,6 +196,10 @@ <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> <dependency> <groupId>org.apache.unomi</groupId> @@ -209,11 +213,6 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast-all</artifactId> diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 9de538795..4562dd5cc 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -19,6 +19,8 @@ package org.apache.unomi.persistence.elasticsearch; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -116,8 +118,10 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; @@ -460,11 +464,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, registerRolloverLifecyclePolicy(); loadPredefinedMappings(bundleContext, false); + loadPainlessScripts(bundleContext); // load predefined mappings and condition dispatchers of any bundles that were started before this one. for (Bundle existingBundle : bundleContext.getBundles()) { if (existingBundle.getBundleContext() != null) { loadPredefinedMappings(existingBundle.getBundleContext(), false); + loadPainlessScripts(existingBundle.getBundleContext()); } } @@ -690,6 +696,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, switch (event.getType()) { case BundleEvent.STARTING: loadPredefinedMappings(event.getBundle().getBundleContext(), true); + loadPainlessScripts(event.getBundle().getBundleContext()); break; } } @@ -722,6 +729,29 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + private void loadPainlessScripts(BundleContext bundleContext) { + Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true); + if (scriptsURL == null) { + return; + } + + Map<String, String> scriptsById = new HashMap<>(); + while (scriptsURL.hasMoreElements()) { + URL scriptURL = scriptsURL.nextElement(); + logger.info("Found painless script at " + scriptURL + ", loading... "); + try (InputStream in = scriptURL.openStream()) { + String script = IOUtils.toString(in, StandardCharsets.UTF_8); + String scriptId = FilenameUtils.getBaseName(scriptURL.getPath()); + scriptsById.put(scriptId, script); + } catch (Exception e) { + logger.error("Error while loading painless script " + scriptURL, e); + } + + } + + storeScripts(scriptsById); + } + private String loadMappingFile(URL predefinedMappingURL) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream())); diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java index 6ecd0e797..e52c8fd84 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java @@ -30,8 +30,9 @@ import org.apache.unomi.persistence.spi.PersistenceService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class MergeProfilesOnPropertyAction implements ActionExecutor { private static final Logger logger = LoggerFactory.getLogger(MergeProfilesOnPropertyAction.class.getName()); @@ -41,6 +42,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { private EventService eventService; private DefinitionsService definitionsService; private PrivacyService privacyService; + private SchedulerService schedulerService; private int maxProfilesInOneMerge = -1; public int execute(Action action, Event event) { @@ -48,6 +50,8 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { Profile eventProfile = event.getProfile(); final String mergePropName = (String) action.getParameterValues().get("mergeProfilePropertyName"); final String mergePropValue = (String) action.getParameterValues().get("mergeProfilePropertyValue"); + final String clientIdFromEvent = (String) event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE); + final String clientId = clientIdFromEvent != null ? clientIdFromEvent : "defaultClientId"; boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ? (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false; final String currentProfileMergeValue = (String) eventProfile.getSystemProperties().get(mergePropName); @@ -60,7 +64,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { // Check if the user switched to another profile if (StringUtils.isNotEmpty(currentProfileMergeValue) && !currentProfileMergeValue.equals(mergePropValue)) { - reassignSession(event, profilesToBeMerge, forceEventProfileAsMaster, mergePropName, mergePropValue); + reassignCurrentBrowsingData(event, profilesToBeMerge, forceEventProfileAsMaster, mergePropName, mergePropValue); return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED; } @@ -82,67 +86,51 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { } final String eventProfileId = eventProfile.getItemId(); - final Profile mergedProfile = profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile : profilesToBeMerge.get(0), profilesToBeMerge); - final String mergedProfileId = mergedProfile.getItemId(); + final Profile masterProfile = profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile : profilesToBeMerge.get(0), profilesToBeMerge); + final String masterProfileId = masterProfile.getItemId(); // Profile is still using the same profileId after being merged, no need to rewrite exists data, merge is done - if (!forceEventProfileAsMaster && mergedProfileId.equals(eventProfileId)) { + if (!forceEventProfileAsMaster && masterProfileId.equals(eventProfileId)) { return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE; } // ProfileID changed we have a lot to do // First check for privacy stuff (inherit from previous profile if necessary) if (privacyService.isRequireAnonymousBrowsing(eventProfile)) { - privacyService.setRequireAnonymousBrowsing(mergedProfileId, true, event.getScope()); + privacyService.setRequireAnonymousBrowsing(masterProfileId, true, event.getScope()); } - final boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(mergedProfileId); + final boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(masterProfileId); // Modify current session: if (event.getSession() != null) { - event.getSession().setProfile(anonymousBrowsing ? privacyService.getAnonymousProfile(mergedProfile) : mergedProfile); + event.getSession().setProfile(anonymousBrowsing ? privacyService.getAnonymousProfile(masterProfile) : masterProfile); } // Modify current event: - event.setProfileId(anonymousBrowsing ? null : mergedProfileId); - event.setProfile(mergedProfile); + event.setProfileId(anonymousBrowsing ? null : masterProfileId); + event.setProfile(masterProfile); event.getActionPostExecutors().add(() -> { try { - // Save event, as we changed the profileId of the current event + // This is the list of profile Ids to be updated in browsing data (events/sessions) + List<String> mergedProfileIds = profilesToBeMerge.stream() + .map(Profile::getItemId) + .filter(mergedProfileId -> !StringUtils.equals(mergedProfileId, masterProfileId)) + .collect(Collectors.toList()); + + // ASYNC: Update browsing data (events/sessions) for merged profiles + reassignPersistedBrowsingDatasAsync(anonymousBrowsing, mergedProfileIds, masterProfileId); + + // Save event, as we dynamically changed the profileId of the current event if (event.isPersistent()) { persistenceService.save(event); } - for (Profile profileToBeMerge : profilesToBeMerge) { - String profileToBeMergeId = profileToBeMerge.getItemId(); - if (!StringUtils.equals(profileToBeMergeId, mergedProfileId)) { - - // TODO (UNOMI-748): the following updates are asynchron due to usage of bulk processor in ElasticSearch persistence service update function. - // We could consider replacing those updates(one item at a time) by updateByQueryAndScript to avoid loading all the sessions/events in memory, - // but we would loose the asynchronous nature (By doing that request may take longer than before, - // and could potentially break client side timeouts on requests) - List<Event> oldEvents = persistenceService.query("profileId", profileToBeMergeId, null, Event.class); - for (Event oldEvent : oldEvents) { - if (!oldEvent.getItemId().equals(event.getItemId())) { - persistenceService.update(oldEvent, Event.class, "profileId", anonymousBrowsing ? null : mergedProfileId); - } - } - - // TODO (UNOMI-749): this is creating inconsistent sessions, they still contains old profile. - // And due to deserialization of sessions the profileId property will always be the one from profile stored in the session - List<Session> oldSessions = persistenceService.query("profileId", profileToBeMergeId, null, Session.class); - for (Session oldSession : oldSessions) { - if (!oldSession.getItemId().equals(event.getSession().getItemId())) { - persistenceService.update(oldSession, Session.class, "profileId", anonymousBrowsing ? null : mergedProfileId); - } - } - - final String clientIdFromEvent = (String) event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE); - String clientId = clientIdFromEvent != null ? clientIdFromEvent : "defaultClientId"; - profileService.addAliasToProfile(mergedProfileId, profileToBeMergeId, clientId); - if (profileService.load(profileToBeMergeId) != null) { - profileService.delete(profileToBeMergeId, false); - } + // Handle aliases + for (String mergedProfileId : mergedProfileIds) { + profileService.addAliasToProfile(masterProfileId, mergedProfileId, clientId); + if (persistenceService.load(mergedProfileId, Profile.class) != null) { + profileService.delete(mergedProfileId, false); } } } catch (Exception e) { @@ -164,7 +152,32 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { return persistenceService.query(propertyCondition, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList(); } - private void reassignSession(Event event, List<Profile> existingMergedProfiles, boolean forceEventProfileAsMaster, String mergePropName, String mergePropValue) { + private void reassignPersistedBrowsingDatasAsync(boolean anonymousBrowsing, List<String> mergedProfileIds, String masterProfileId) { + schedulerService.getSharedScheduleExecutorService().schedule(new TimerTask() { + @Override + public void run() { + if (!anonymousBrowsing) { + Condition profileIdsCondition = new Condition(definitionsService.getConditionType("eventPropertyCondition")); + profileIdsCondition.setParameter("propertyName","profileId"); + profileIdsCondition.setParameter("comparisonOperator","in"); + profileIdsCondition.setParameter("propertyValues", mergedProfileIds); + + String[] scripts = new String[]{"updateProfileId"}; + Map<String, Object>[] scriptParams = new Map[]{Collections.singletonMap("profileId", masterProfileId)}; + Condition[] conditions = new Condition[]{profileIdsCondition}; + + persistenceService.updateWithQueryAndStoredScript(Session.class, scripts, scriptParams, conditions); + persistenceService.updateWithQueryAndStoredScript(Event.class, scripts, scriptParams, conditions); + } else { + for (String mergedProfileId : mergedProfileIds) { + privacyService.anonymizeBrowsingData(mergedProfileId); + } + } + } + }, 1000, TimeUnit.MILLISECONDS); + } + + private void reassignCurrentBrowsingData(Event event, List<Profile> existingMergedProfiles, boolean forceEventProfileAsMaster, String mergePropName, String mergePropValue) { Profile eventProfile = event.getProfile(); if (existingMergedProfiles.size() > 0) { @@ -179,7 +192,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { eventProfile.getSystemProperties().put(mergePropName, mergePropValue); } - logger.info("Different users, switch to " + eventProfile.getItemId()); + logger.info("Different users, switch to {}", eventProfile.getItemId()); // At the end of the merge, we must set the merged profile as profile event to process other Actions event.setProfileId(eventProfile.getItemId()); event.setProfile(eventProfile); @@ -212,6 +225,10 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { this.definitionsService = definitionsService; } + public void setSchedulerService(SchedulerService schedulerService) { + this.schedulerService = schedulerService; + } + public void setMaxProfilesInOneMerge(String maxProfilesInOneMerge) { this.maxProfilesInOneMerge = Integer.parseInt(maxProfilesInOneMerge); } diff --git a/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless new file mode 100644 index 000000000..31f900c62 --- /dev/null +++ b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + This script is used to update the profileId and profile.profileId in sessions and/or events after a merge situation + required params: + - params.profileId: the ID of the new profileId +*/ + +// update profileId +if (ctx._source.containsKey("profileId") && ctx._source.profileId != params.profileId) { + ctx._source.put("profileId", params.profileId) +} + +// update inner profile.profileId if the inner profile exists (in sessions for example) +if (ctx._source.containsKey("profile") && ctx._source.profile.containsKey("itemId") && ctx._source.profile.itemId != params.profileId) { + ctx._source.profile.put("itemId", params.profileId) +} \ No newline at end of file diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 3222eb680..15b24ec39 100644 --- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -43,6 +43,7 @@ <reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/> <reference id="profileService" interface="org.apache.unomi.api.services.ProfileService"/> <reference id="privacyService" interface="org.apache.unomi.api.services.PrivacyService"/> + <reference id="schedulerService" interface="org.apache.unomi.api.services.SchedulerService"/> <reference id="segmentService" interface="org.apache.unomi.api.services.SegmentService"/> <reference id="eventService" interface="org.apache.unomi.api.services.EventService"/> <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService"/> @@ -315,6 +316,7 @@ <property name="persistenceService" ref="persistenceService"/> <property name="definitionsService" ref="definitionsService"/> <property name="privacyService" ref="privacyService"/> + <property name="schedulerService" ref="schedulerService"/> <property name="maxProfilesInOneMerge" value="${base.maxProfilesInOneMerge}"/> </bean> </service> diff --git a/services/pom.xml b/services/pom.xml index a1b909362..a6e739831 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -116,10 +116,6 @@ <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> <dependency> <groupId>org.apache.karaf.cellar</groupId> <artifactId>org.apache.karaf.cellar.core</artifactId> diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index 216261c5a..1cc852f24 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -20,9 +20,6 @@ package org.apache.unomi.services.impl.segments; import com.fasterxml.jackson.core.JsonProcessingException; import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.RetryPolicy; -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.CharEncoding; import org.apache.unomi.api.Event; import org.apache.unomi.api.Item; import org.apache.unomi.api.Metadata; @@ -52,9 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InputStream; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.time.Duration; import java.time.ZoneOffset; @@ -163,7 +158,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } bundleContext.addBundleListener(this); initializeTimer(); - loadScripts(); logger.info("Segment service initialized."); } @@ -1239,25 +1233,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0, segmentRefreshInterval, TimeUnit.MILLISECONDS); } - private void loadScripts() throws IOException { - Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true); - if (scriptsURL == null) { - return; - } - - Map<String, String> scriptsById = new HashMap<>(); - while (scriptsURL.hasMoreElements()) { - URL scriptURL = scriptsURL.nextElement(); - logger.debug("Found painless script at " + scriptURL + ", loading... "); - try (InputStream in = scriptURL.openStream()) { - String script = IOUtils.toString(in, StandardCharsets.UTF_8); - String scriptId = FilenameUtils.getBaseName(scriptURL.getPath()); - scriptsById.put(scriptId, script); - } - } - persistenceService.storeScripts(scriptsById); - } - public void setTaskExecutionPeriod(long taskExecutionPeriod) { this.taskExecutionPeriod = taskExecutionPeriod; }