UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates Replace lists with sets to optimize porfile removal
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/85f00e8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/85f00e8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/85f00e8a Branch: refs/heads/master Commit: 85f00e8ace06eb603619d604718879ef39d29f20 Parents: 1075a02 Author: Serge Huber <[email protected]> Authored: Thu Nov 24 20:17:21 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Thu Nov 24 20:17:21 2016 +0100 ---------------------------------------------------------------------- .../unomi/services/services/SegmentServiceImpl.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/85f00e8a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java ---------------------------------------------------------------------- diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java index 06b83b0..d1f120f 100644 --- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java @@ -825,21 +825,25 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList segmentCondition.setParameter("propertyValue", segment.getItemId()); if(segment.getMetadata().isEnabled()) { + // the following list can grow really big if the segments are large. + // We might want to replace this with scrolling if it becomes huge + // (100million profiles) List<Profile> previousProfiles = persistenceService.query(segmentCondition, null, Profile.class); List<Profile> newProfiles = persistenceService.query(segment.getCondition(), null, Profile.class); - List<Profile> add = new ArrayList<>(newProfiles); - add.removeAll(previousProfiles); - previousProfiles.removeAll(newProfiles); + Set<Profile> profilesToAdd = new HashSet<>(newProfiles); + Set<Profile> profilesToRemove = new HashSet<>(previousProfiles); + profilesToAdd.removeAll(previousProfiles); + profilesToRemove.removeAll(newProfiles); - for (Profile profileToAdd : add) { + for (Profile profileToAdd : profilesToAdd) { profileToAdd.getSegments().add(segment.getItemId()); persistenceService.update(profileToAdd.getItemId(), null, Profile.class, "segments", profileToAdd.getSegments()); Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date()); profileUpdated.setPersistent(false); eventService.send(profileUpdated); } - for (Profile profileToRemove : previousProfiles) { + for (Profile profileToRemove : profilesToRemove) { profileToRemove.getSegments().remove(segment.getItemId()); persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments()); Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
