This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch merge-improved2 in repository https://gitbox.apache.org/repos/asf/unomi.git
commit e307dd9cd6817524f5516f478f831ad3ccea4478 Author: Kevan <ke...@jahia.com> AuthorDate: Mon Mar 20 16:43:28 2023 +0100 UNOMI-748, UNOMI-749: more improvements for merge system: updateWithScript to reassign past sessions/events, fix inconstency in sessions profileId, use schedulerService to support asynchronous update of past browsing data. --- .../unomi/privacy/internal/PrivacyServiceImpl.java | 5 - .../org/apache/unomi/itests/ProfileMergeIT.java | 5 +- persistence-elasticsearch/core/pom.xml | 9 +- .../ElasticSearchPersistenceServiceImpl.java | 30 ++++++ .../actions/MergeProfilesOnPropertyAction.java | 105 ++++++++++++--------- .../META-INF/cxs/painless/updateProfileId.painless | 32 +++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 2 + services/pom.xml | 4 - .../services/impl/segments/SegmentServiceImpl.java | 25 ----- 9 files changed, 131 insertions(+), 86 deletions(-) diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java index fb357d559..5d6abb828 100644 --- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java +++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java @@ -127,11 +127,6 @@ public class PrivacyServiceImpl implements PrivacyService { @Override public Boolean anonymizeBrowsingData(String profileId) { - Profile profile = profileService.load(profileId); - if (profile == null) { - return false; - } - List<Session> sessions = profileService.getProfileSessions(profileId, null, 0, -1, null).getList(); if (sessions.isEmpty()) { return false; diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java index 7815baccb..ccc450bbe 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java @@ -287,12 +287,11 @@ public class ProfileMergeIT extends BaseIT { () -> persistenceService.queryCount(sessionProfileIDRewrittenCondition, Session.ITEM_TYPE), (count) -> count == 5, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); - // TODO uncomment this when UNOMI-749 is fixed, currently session loaded are inconsistent - /* for (Session session : sessionsToBeRewritten) { + for (Session session : sessionsToBeRewritten) { keepTrying("Wait for session: " + session.getItemId() + " profileId to be rewritten for masterProfileID", () -> persistenceService.load(session.getItemId(), Session.class), (loadedSession) -> loadedSession.getProfileId().equals("masterProfileID"), DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES); - } */ + } } /** diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index 8aa4fd7ba..770c6cea8 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -196,6 +196,10 @@ <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> <dependency> <groupId>org.apache.unomi</groupId> @@ -209,11 +213,6 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast-all</artifactId> diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 9de538795..4562dd5cc 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -19,6 +19,8 @@ package org.apache.unomi.persistence.elasticsearch; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -116,8 +118,10 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; @@ -460,11 +464,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, registerRolloverLifecyclePolicy(); loadPredefinedMappings(bundleContext, false); + loadPainlessScripts(bundleContext); // load predefined mappings and condition dispatchers of any bundles that were started before this one. for (Bundle existingBundle : bundleContext.getBundles()) { if (existingBundle.getBundleContext() != null) { loadPredefinedMappings(existingBundle.getBundleContext(), false); + loadPainlessScripts(existingBundle.getBundleContext()); } } @@ -690,6 +696,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, switch (event.getType()) { case BundleEvent.STARTING: loadPredefinedMappings(event.getBundle().getBundleContext(), true); + loadPainlessScripts(event.getBundle().getBundleContext()); break; } } @@ -722,6 +729,29 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + private void loadPainlessScripts(BundleContext bundleContext) { + Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true); + if (scriptsURL == null) { + return; + } + + Map<String, String> scriptsById = new HashMap<>(); + while (scriptsURL.hasMoreElements()) { + URL scriptURL = scriptsURL.nextElement(); + logger.info("Found painless script at " + scriptURL + ", loading... "); + try (InputStream in = scriptURL.openStream()) { + String script = IOUtils.toString(in, StandardCharsets.UTF_8); + String scriptId = FilenameUtils.getBaseName(scriptURL.getPath()); + scriptsById.put(scriptId, script); + } catch (Exception e) { + logger.error("Error while loading painless script " + scriptURL, e); + } + + } + + storeScripts(scriptsById); + } + private String loadMappingFile(URL predefinedMappingURL) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream())); diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java index 6ecd0e797..e52c8fd84 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java @@ -30,8 +30,9 @@ import org.apache.unomi.persistence.spi.PersistenceService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class MergeProfilesOnPropertyAction implements ActionExecutor { private static final Logger logger = LoggerFactory.getLogger(MergeProfilesOnPropertyAction.class.getName()); @@ -41,6 +42,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { private EventService eventService; private DefinitionsService definitionsService; private PrivacyService privacyService; + private SchedulerService schedulerService; private int maxProfilesInOneMerge = -1; public int execute(Action action, Event event) { @@ -48,6 +50,8 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { Profile eventProfile = event.getProfile(); final String mergePropName = (String) action.getParameterValues().get("mergeProfilePropertyName"); final String mergePropValue = (String) action.getParameterValues().get("mergeProfilePropertyValue"); + final String clientIdFromEvent = (String) event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE); + final String clientId = clientIdFromEvent != null ? clientIdFromEvent : "defaultClientId"; boolean forceEventProfileAsMaster = action.getParameterValues().containsKey("forceEventProfileAsMaster") ? (boolean) action.getParameterValues().get("forceEventProfileAsMaster") : false; final String currentProfileMergeValue = (String) eventProfile.getSystemProperties().get(mergePropName); @@ -60,7 +64,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { // Check if the user switched to another profile if (StringUtils.isNotEmpty(currentProfileMergeValue) && !currentProfileMergeValue.equals(mergePropValue)) { - reassignSession(event, profilesToBeMerge, forceEventProfileAsMaster, mergePropName, mergePropValue); + reassignCurrentBrowsingData(event, profilesToBeMerge, forceEventProfileAsMaster, mergePropName, mergePropValue); return EventService.PROFILE_UPDATED + EventService.SESSION_UPDATED; } @@ -82,67 +86,51 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { } final String eventProfileId = eventProfile.getItemId(); - final Profile mergedProfile = profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile : profilesToBeMerge.get(0), profilesToBeMerge); - final String mergedProfileId = mergedProfile.getItemId(); + final Profile masterProfile = profileService.mergeProfiles(forceEventProfileAsMaster ? eventProfile : profilesToBeMerge.get(0), profilesToBeMerge); + final String masterProfileId = masterProfile.getItemId(); // Profile is still using the same profileId after being merged, no need to rewrite exists data, merge is done - if (!forceEventProfileAsMaster && mergedProfileId.equals(eventProfileId)) { + if (!forceEventProfileAsMaster && masterProfileId.equals(eventProfileId)) { return profileUpdated ? EventService.PROFILE_UPDATED : EventService.NO_CHANGE; } // ProfileID changed we have a lot to do // First check for privacy stuff (inherit from previous profile if necessary) if (privacyService.isRequireAnonymousBrowsing(eventProfile)) { - privacyService.setRequireAnonymousBrowsing(mergedProfileId, true, event.getScope()); + privacyService.setRequireAnonymousBrowsing(masterProfileId, true, event.getScope()); } - final boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(mergedProfileId); + final boolean anonymousBrowsing = privacyService.isRequireAnonymousBrowsing(masterProfileId); // Modify current session: if (event.getSession() != null) { - event.getSession().setProfile(anonymousBrowsing ? privacyService.getAnonymousProfile(mergedProfile) : mergedProfile); + event.getSession().setProfile(anonymousBrowsing ? privacyService.getAnonymousProfile(masterProfile) : masterProfile); } // Modify current event: - event.setProfileId(anonymousBrowsing ? null : mergedProfileId); - event.setProfile(mergedProfile); + event.setProfileId(anonymousBrowsing ? null : masterProfileId); + event.setProfile(masterProfile); event.getActionPostExecutors().add(() -> { try { - // Save event, as we changed the profileId of the current event + // This is the list of profile Ids to be updated in browsing data (events/sessions) + List<String> mergedProfileIds = profilesToBeMerge.stream() + .map(Profile::getItemId) + .filter(mergedProfileId -> !StringUtils.equals(mergedProfileId, masterProfileId)) + .collect(Collectors.toList()); + + // ASYNC: Update browsing data (events/sessions) for merged profiles + reassignPersistedBrowsingDatasAsync(anonymousBrowsing, mergedProfileIds, masterProfileId); + + // Save event, as we dynamically changed the profileId of the current event if (event.isPersistent()) { persistenceService.save(event); } - for (Profile profileToBeMerge : profilesToBeMerge) { - String profileToBeMergeId = profileToBeMerge.getItemId(); - if (!StringUtils.equals(profileToBeMergeId, mergedProfileId)) { - - // TODO (UNOMI-748): the following updates are asynchron due to usage of bulk processor in ElasticSearch persistence service update function. - // We could consider replacing those updates(one item at a time) by updateByQueryAndScript to avoid loading all the sessions/events in memory, - // but we would loose the asynchronous nature (By doing that request may take longer than before, - // and could potentially break client side timeouts on requests) - List<Event> oldEvents = persistenceService.query("profileId", profileToBeMergeId, null, Event.class); - for (Event oldEvent : oldEvents) { - if (!oldEvent.getItemId().equals(event.getItemId())) { - persistenceService.update(oldEvent, Event.class, "profileId", anonymousBrowsing ? null : mergedProfileId); - } - } - - // TODO (UNOMI-749): this is creating inconsistent sessions, they still contains old profile. - // And due to deserialization of sessions the profileId property will always be the one from profile stored in the session - List<Session> oldSessions = persistenceService.query("profileId", profileToBeMergeId, null, Session.class); - for (Session oldSession : oldSessions) { - if (!oldSession.getItemId().equals(event.getSession().getItemId())) { - persistenceService.update(oldSession, Session.class, "profileId", anonymousBrowsing ? null : mergedProfileId); - } - } - - final String clientIdFromEvent = (String) event.getAttributes().get(Event.CLIENT_ID_ATTRIBUTE); - String clientId = clientIdFromEvent != null ? clientIdFromEvent : "defaultClientId"; - profileService.addAliasToProfile(mergedProfileId, profileToBeMergeId, clientId); - if (profileService.load(profileToBeMergeId) != null) { - profileService.delete(profileToBeMergeId, false); - } + // Handle aliases + for (String mergedProfileId : mergedProfileIds) { + profileService.addAliasToProfile(masterProfileId, mergedProfileId, clientId); + if (persistenceService.load(mergedProfileId, Profile.class) != null) { + profileService.delete(mergedProfileId, false); } } } catch (Exception e) { @@ -164,7 +152,32 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { return persistenceService.query(propertyCondition, "properties.firstVisit", Profile.class, 0, maxProfilesInOneMerge).getList(); } - private void reassignSession(Event event, List<Profile> existingMergedProfiles, boolean forceEventProfileAsMaster, String mergePropName, String mergePropValue) { + private void reassignPersistedBrowsingDatasAsync(boolean anonymousBrowsing, List<String> mergedProfileIds, String masterProfileId) { + schedulerService.getSharedScheduleExecutorService().schedule(new TimerTask() { + @Override + public void run() { + if (!anonymousBrowsing) { + Condition profileIdsCondition = new Condition(definitionsService.getConditionType("eventPropertyCondition")); + profileIdsCondition.setParameter("propertyName","profileId"); + profileIdsCondition.setParameter("comparisonOperator","in"); + profileIdsCondition.setParameter("propertyValues", mergedProfileIds); + + String[] scripts = new String[]{"updateProfileId"}; + Map<String, Object>[] scriptParams = new Map[]{Collections.singletonMap("profileId", masterProfileId)}; + Condition[] conditions = new Condition[]{profileIdsCondition}; + + persistenceService.updateWithQueryAndStoredScript(Session.class, scripts, scriptParams, conditions); + persistenceService.updateWithQueryAndStoredScript(Event.class, scripts, scriptParams, conditions); + } else { + for (String mergedProfileId : mergedProfileIds) { + privacyService.anonymizeBrowsingData(mergedProfileId); + } + } + } + }, 1000, TimeUnit.MILLISECONDS); + } + + private void reassignCurrentBrowsingData(Event event, List<Profile> existingMergedProfiles, boolean forceEventProfileAsMaster, String mergePropName, String mergePropValue) { Profile eventProfile = event.getProfile(); if (existingMergedProfiles.size() > 0) { @@ -179,7 +192,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { eventProfile.getSystemProperties().put(mergePropName, mergePropValue); } - logger.info("Different users, switch to " + eventProfile.getItemId()); + logger.info("Different users, switch to {}", eventProfile.getItemId()); // At the end of the merge, we must set the merged profile as profile event to process other Actions event.setProfileId(eventProfile.getItemId()); event.setProfile(eventProfile); @@ -212,6 +225,10 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { this.definitionsService = definitionsService; } + public void setSchedulerService(SchedulerService schedulerService) { + this.schedulerService = schedulerService; + } + public void setMaxProfilesInOneMerge(String maxProfilesInOneMerge) { this.maxProfilesInOneMerge = Integer.parseInt(maxProfilesInOneMerge); } diff --git a/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless new file mode 100644 index 000000000..31f900c62 --- /dev/null +++ b/plugins/baseplugin/src/main/resources/META-INF/cxs/painless/updateProfileId.painless @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + This script is used to update the profileId and profile.profileId in sessions and/or events after a merge situation + required params: + - params.profileId: the ID of the new profileId +*/ + +// update profileId +if (ctx._source.containsKey("profileId") && ctx._source.profileId != params.profileId) { + ctx._source.put("profileId", params.profileId) +} + +// update inner profile.profileId if the inner profile exists (in sessions for example) +if (ctx._source.containsKey("profile") && ctx._source.profile.containsKey("itemId") && ctx._source.profile.itemId != params.profileId) { + ctx._source.profile.put("itemId", params.profileId) +} \ No newline at end of file diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 3222eb680..15b24ec39 100644 --- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -43,6 +43,7 @@ <reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/> <reference id="profileService" interface="org.apache.unomi.api.services.ProfileService"/> <reference id="privacyService" interface="org.apache.unomi.api.services.PrivacyService"/> + <reference id="schedulerService" interface="org.apache.unomi.api.services.SchedulerService"/> <reference id="segmentService" interface="org.apache.unomi.api.services.SegmentService"/> <reference id="eventService" interface="org.apache.unomi.api.services.EventService"/> <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService"/> @@ -315,6 +316,7 @@ <property name="persistenceService" ref="persistenceService"/> <property name="definitionsService" ref="definitionsService"/> <property name="privacyService" ref="privacyService"/> + <property name="schedulerService" ref="schedulerService"/> <property name="maxProfilesInOneMerge" value="${base.maxProfilesInOneMerge}"/> </bean> </service> diff --git a/services/pom.xml b/services/pom.xml index a1b909362..a6e739831 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -116,10 +116,6 @@ <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> <dependency> <groupId>org.apache.karaf.cellar</groupId> <artifactId>org.apache.karaf.cellar.core</artifactId> diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index 216261c5a..1cc852f24 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -20,9 +20,6 @@ package org.apache.unomi.services.impl.segments; import com.fasterxml.jackson.core.JsonProcessingException; import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.RetryPolicy; -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.CharEncoding; import org.apache.unomi.api.Event; import org.apache.unomi.api.Item; import org.apache.unomi.api.Metadata; @@ -52,9 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InputStream; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.time.Duration; import java.time.ZoneOffset; @@ -163,7 +158,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } bundleContext.addBundleListener(this); initializeTimer(); - loadScripts(); logger.info("Segment service initialized."); } @@ -1239,25 +1233,6 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe schedulerService.getScheduleExecutorService().scheduleAtFixedRate(task, 0, segmentRefreshInterval, TimeUnit.MILLISECONDS); } - private void loadScripts() throws IOException { - Enumeration<URL> scriptsURL = bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", true); - if (scriptsURL == null) { - return; - } - - Map<String, String> scriptsById = new HashMap<>(); - while (scriptsURL.hasMoreElements()) { - URL scriptURL = scriptsURL.nextElement(); - logger.debug("Found painless script at " + scriptURL + ", loading... "); - try (InputStream in = scriptURL.openStream()) { - String script = IOUtils.toString(in, StandardCharsets.UTF_8); - String scriptId = FilenameUtils.getBaseName(scriptURL.getPath()); - scriptsById.put(scriptId, script); - } - } - persistenceService.storeScripts(scriptsById); - } - public void setTaskExecutionPeriod(long taskExecutionPeriod) { this.taskExecutionPeriod = taskExecutionPeriod; }