Further optimizations - Add shell script to build and run without integration and performance tests. - Segment updates should now use a lot more memory since we now use scrolling queries
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/28d7dbde Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/28d7dbde Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/28d7dbde Branch: refs/heads/master Commit: 28d7dbde4728ed41eed6cc43022039ad02ddce7e Parents: d7a7969 Author: Serge Huber <[email protected]> Authored: Thu Dec 15 16:22:38 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Thu Dec 15 16:22:38 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/unomi/api/PartialList.java | 25 +++++ buildAndRunNoTests.sh | 34 ++++++ .../ElasticSearchPersistenceServiceImpl.java | 94 +++++++++++++--- .../persistence/spi/PersistenceService.java | 33 ++++++ .../PropertyConditionESQueryBuilder.java | 3 +- .../services/services/SegmentServiceImpl.java | 111 ++++++++++++------- .../resources/OSGI-INF/blueprint/blueprint.xml | 2 + 7 files changed, 242 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/api/src/main/java/org/apache/unomi/api/PartialList.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/unomi/api/PartialList.java b/api/src/main/java/org/apache/unomi/api/PartialList.java index 59f7e4d..daa69c9 100644 --- a/api/src/main/java/org/apache/unomi/api/PartialList.java +++ b/api/src/main/java/org/apache/unomi/api/PartialList.java @@ -37,6 +37,8 @@ public class PartialList<T> implements Serializable { private long offset; private long pageSize; private long totalSize; + private String scrollIdentifier = null; + private String scrollTimeValidity = null; /** * Instantiates a new PartialList. @@ -141,4 +143,27 @@ public class PartialList<T> implements Serializable { return list.get(index); } + /** + * Retrieve the scroll identifier to make it possible to continue a scrolling list query + * @return a string containing the scroll identifier, to be sent back in an subsequent request + */ + public String getScrollIdentifier() { + return scrollIdentifier; + } + + public void setScrollIdentifier(String scrollIdentifier) { + this.scrollIdentifier = scrollIdentifier; + } + + /** + * Retrieve the value of the scroll time validity to make it possible to continue a scrolling list query + * @return a string containing a time value for the scroll validity, to be sent back in a subsequent request + */ + public String getScrollTimeValidity() { + return scrollTimeValidity; + } + + public void setScrollTimeValidity(String scrollTimeValidity) { + this.scrollTimeValidity = scrollTimeValidity; + } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/buildAndRunNoTests.sh ---------------------------------------------------------------------- diff --git a/buildAndRunNoTests.sh b/buildAndRunNoTests.sh new file mode 100755 index 0000000..c8f7641 --- /dev/null +++ b/buildAndRunNoTests.sh @@ -0,0 +1,34 @@ +#!/bin/sh +################################################################################ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +################################################################################ +echo Building... +DIRNAME=`dirname "$0"` +PROGNAME=`basename "$0"` +if [ -f "$DIRNAME/setenv.sh" ]; then + . "$DIRNAME/setenv.sh" +fi +mvn clean install -P \!integration-tests,\!performance-tests,rat +pushd package/target +echo Uncompressing Unomi package... +tar zxvf unomi-$UNOMI_VERSION.tar.gz +cd unomi-$UNOMI_VERSION/bin +echo Starting Unomi... +./karaf debug +popd + http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 794b03b..d6a136e 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -695,7 +695,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy) { - return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null); + return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, null); } @Override @@ -711,7 +711,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String itemType = (String) clazz.getField("ITEM_TYPE").get(null); if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) { - PartialList<T> r = query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null); + PartialList<T> r = query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null, null); if (r.size() > 0) { return r.get(0); } @@ -1128,12 +1128,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) { - return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null); + return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, null); + } + + @Override + public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size, final String scrollTimeValidity) { + return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, scrollTimeValidity); } @Override public <T extends Item> PartialList<T> queryFullText(final String fulltext, final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) { - return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null); + return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null, null); } @Override @@ -1143,22 +1148,22 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> List<T> query(final String fieldName, final String[] fieldValues, String sortBy, final Class<T> clazz) { - return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz)).getList(); + return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz), null).getList(); } @Override public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) { - return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz)); + return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null); } @Override public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) { - return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz)); + return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null); } @Override public <T extends Item> PartialList<T> queryFullText(String fulltext, String sortBy, Class<T> clazz, int offset, int size) { - return query(QueryBuilders.queryStringQuery(fulltext).defaultField("_all"), sortBy, clazz, offset, size, getRouting("_all", new String[]{fulltext}, clazz)); + return query(QueryBuilders.queryStringQuery(fulltext).defaultField("_all"), sortBy, clazz, offset, size, getRouting("_all", new String[]{fulltext}, clazz), null); } @Override @@ -1166,7 +1171,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, RangeQueryBuilder builder = QueryBuilders.rangeQuery(fieldName); builder.from(from); builder.to(to); - return query(builder, sortBy, clazz, offset, size, null); + return query(builder, sortBy, clazz, offset, size, null, null); } @Override @@ -1193,21 +1198,35 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, }.executeInClassLoader(); } - private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing) { + private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) { return new InClassLoaderExecute<PartialList<T>>() { @Override protected PartialList<T> execute(Object... args) { List<T> results = new ArrayList<T>(); + String scrollIdentifier = null; long totalHits = 0; try { String itemType = getItemType(clazz); + TimeValue keepAlive = TimeValue.timeValueHours(1); + SearchRequestBuilder requestBuilder = null; + if (scrollTimeValidity != null) { + keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueHours(1), "scrollTimeValidity"); + requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) + .setTypes(itemType) + .setFetchSource(true) + .setScroll(keepAlive) + .setFrom(offset) + .setQuery(query) + .setSize(size); + } else { + requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) + .setTypes(itemType) + .setFetchSource(true) + .setQuery(query) + .setFrom(offset); + } - SearchRequestBuilder requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) - .setTypes(itemType) - .setFetchSource(true) - .setQuery(query) - .setFrom(offset); if (size == Integer.MIN_VALUE) { requestBuilder.setSize(defaultQueryLimit); } else if (size != -1) { @@ -1244,6 +1263,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .execute() .actionGet(); SearchHits searchHits = response.getHits(); + scrollIdentifier = response.getScrollId(); totalHits = searchHits.getTotalHits(); for (SearchHit searchHit : searchHits) { String sourceAsString = searchHit.getSourceAsString(); @@ -1255,7 +1275,49 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.error("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t); } - return new PartialList<T>(results, offset, size, totalHits); + PartialList<T> result = new PartialList<T>(results, offset, size, totalHits); + if (scrollIdentifier != null && totalHits != 0) { + result.setScrollIdentifier(scrollIdentifier); + result.setScrollTimeValidity(scrollTimeValidity); + } + return result; + } + }.executeInClassLoader(); + } + + @Override + public <T extends Item> PartialList<T> continueScrollQuery(final Class<T> clazz, final String scrollIdentifier, final String scrollTimeValidity) { + return new InClassLoaderExecute<PartialList<T>>() { + + @Override + protected PartialList<T> execute(Object... args) { + List<T> results = new ArrayList<T>(); + long totalHits = 0; + try { + TimeValue keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueMinutes(10), "scrollTimeValidity"); + SearchResponse response = client.prepareSearchScroll(scrollIdentifier).setScroll(keepAlive).execute().actionGet(); + + if (response.getHits().getHits().length == 0) { + client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet(); + } else { + for (SearchHit searchHit : response.getHits().getHits()) { + // add hit to results + String sourceAsString = searchHit.getSourceAsString(); + final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); + value.setItemId(searchHit.getId()); + results.add(value); + } + } + PartialList<T> result = new PartialList<T>(results, 0, response.getHits().getHits().length, response.getHits().getTotalHits()); + if (scrollIdentifier != null) { + result.setScrollIdentifier(scrollIdentifier); + result.setScrollTimeValidity(scrollTimeValidity); + } + return result; + } catch (Exception t) { + logger.error("Error continuing scrolling query for itemType=" + clazz.getName() + " scrollIdentifier=" + scrollIdentifier + " scrollTimeValidity=" + scrollTimeValidity, t); + } + return null; } }.executeInClassLoader(); } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java ---------------------------------------------------------------------- diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index 90b0efc..881c395 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -294,6 +294,39 @@ public interface PersistenceService { <T extends Item> PartialList<T> query(Condition query, String sortBy, Class<T> clazz, int offset, int size); /** + * Retrieves a list of items satisfying the specified {@link Condition}, ordered according to the specified {@code sortBy} String and and paged: only {@code size} of them + * are retrieved, starting with the {@code offset}-th one. If a scroll identifier and time validity are specified, they will be used to perform a scrolling query, meaning + * that only partial results will be returned, but the scrolling can be continued. + * + * @param <T> the type of the Item subclass we want to retrieve + * @param query the {@link Condition} the items must satisfy to be retrieved + * @param sortBy an optional ({@code null} if no sorting is required) String of comma ({@code ,}) separated property names on which ordering should be performed, ordering + * elements according to the property order in the + * String, considering each in turn and moving on to the next one in case of equality of all preceding ones. Each property name is optionally followed by + * a column ({@code :}) and an order specifier: {@code asc} or {@code desc}. + * @param clazz the {@link Item} subclass of the items we want to retrieve + * @param offset zero or a positive integer specifying the position of the first item in the total ordered collection of matching items + * @param size a positive integer specifying how many matching items should be retrieved or {@code -1} if all of them should be retrieved. In the case of a scroll query + * this will be used as the scrolling window size. + * @param scrollTimeValidity the time the scrolling query should stay valid. This must contain a time unit value such as the ones supported by ElasticSearch, such as + * the ones declared here : https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units + * @return a {@link PartialList} of items matching the specified criteria, with an scroll identifier and the scroll validity used if a scroll query was requested. + */ + <T extends Item> PartialList<T> query(Condition query, String sortBy, Class<T> clazz, int offset, int size, String scrollTimeValidity); + + /** + * Continues the execution of a scroll query, to retrieve the next results. If there are no more results the scroll query is also cleared. + * @param clazz the {@link Item} subclass of the items we want to retrieve + * @param scrollIdentifier a scroll identifier obtained by the execution of a first query and returned in the {@link PartialList} object + * @param scrollTimeValidity a scroll time validity value for the scroll query to stay valid. This must contain a time unit value such as the ones supported by ElasticSearch, such as + * the ones declared here : https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units + * @param <T> the type of the Item subclass we want to retrieve + * @return a {@link PartialList} of items matching the specified criteria, with an scroll identifier and the scroll validity used if a scroll query was requested. Note that if + * there are no more results the list will be empty but not null. + */ + <T extends Item> PartialList<T> continueScrollQuery(Class<T> clazz, String scrollIdentifier, String scrollTimeValidity); + + /** * Retrieves the same items as {@code query(query, sortBy, clazz, 0, -1)} with the added constraints that the matching elements must also have at least a field matching the * specified full text query. * http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java ---------------------------------------------------------------------- diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java index 7013581..c36722b 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java @@ -129,8 +129,9 @@ public class PropertyConditionESQueryBuilder implements ConditionESQueryBuilder case "isNotDay": checkRequiredValue(value, name, op, false); return QueryBuilders.boolQuery().mustNot(getIsSameDayRange(value, name)); + default: + throw new IllegalArgumentException("Impossible to build ES filter, unrecognized op=" + op); } - return null; } private void checkRequiredValuesSize(List<?> values, String name, String operator, int expectedSize) { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java ---------------------------------------------------------------------- diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java index 3a64be1..2275d60 100644 --- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.unomi.api.*; import org.apache.unomi.api.actions.Action; import org.apache.unomi.api.conditions.Condition; +import org.apache.unomi.api.conditions.ConditionType; import org.apache.unomi.api.query.Query; import org.apache.unomi.api.rules.Rule; import org.apache.unomi.api.segments.*; @@ -62,6 +63,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList private List<Segment> allSegments; private List<Scoring> allScoring; private Timer segmentTimer; + private int segmentUpdateBatchSize = 1000; public SegmentServiceImpl() { logger.info("Initializing segment service..."); @@ -119,6 +121,10 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList this.rulesService = rulesService; } + public void setSegmentUpdateBatchSize(int segmentUpdateBatchSize) { + this.segmentUpdateBatchSize = segmentUpdateBatchSize; + } + public void postConstruct() { logger.debug("postConstruct {" + bundleContext.getBundle() + "}"); loadPredefinedSegments(bundleContext); @@ -819,64 +825,83 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList long t = System.currentTimeMillis(); Condition segmentCondition = new Condition(); + long updatedProfileCount = 0; + segmentCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition")); segmentCondition.setParameter("propertyName", "segments"); segmentCondition.setParameter("comparisonOperator", "equals"); segmentCondition.setParameter("propertyValue", segment.getItemId()); if(segment.getMetadata().isEnabled()) { - // the following list can grow really big if the segments are large. - // We might want to replace this with scrolling if it becomes huge - // (100million profiles) - List<Profile> previousProfiles = persistenceService.query(segmentCondition, null, Profile.class); - List<Profile> newProfiles = persistenceService.query(segment.getCondition(), null, Profile.class); - - // we use sets instead of lists to speed up contains() calls that are very expensive on lists. - - // we use to use removeAll calls but these are expensive because they require lots of copies upon element - // removal so we implemented them with adds instead. - //profilesToAdd.removeAll(previousProfiles); - //profilesToRemove.removeAll(newProfiles); - Set<Profile> newProfilesSet = new HashSet<>(newProfiles); - Set<Profile> previousProfilesSet = new HashSet<>(previousProfiles); - Set<Profile> profilesToAdd = new HashSet<>(newProfilesSet.size() / 2); - for (Profile newProfile : newProfilesSet) { - if (!previousProfilesSet.contains(newProfile)) { - profilesToAdd.add(newProfile); + ConditionType booleanConditionType = definitionsService.getConditionType("booleanCondition"); + ConditionType notConditionType = definitionsService.getConditionType("notCondition"); + + Condition profilesToAddCondition = new Condition(booleanConditionType); + profilesToAddCondition.setParameter("operator", "and"); + List<Condition> profilesToAddSubConditions = new ArrayList<>(); + profilesToAddSubConditions.add(segment.getCondition()); + Condition notOldSegmentCondition = new Condition(notConditionType); + notOldSegmentCondition.setParameter("subCondition", segmentCondition); + profilesToAddSubConditions.add(notOldSegmentCondition); + profilesToAddCondition.setParameter("subConditions", profilesToAddSubConditions); + + Condition profilesToRemoveCondition = new Condition(booleanConditionType); + profilesToRemoveCondition.setParameter("operator", "and"); + List<Condition> profilesToRemoveSubConditions = new ArrayList<>(); + profilesToRemoveSubConditions.add(segmentCondition); + Condition notNewSegmentCondition = new Condition(notConditionType); + notNewSegmentCondition.setParameter("subCondition", segment.getCondition()); + profilesToRemoveSubConditions.add(notNewSegmentCondition); + profilesToRemoveCondition.setParameter("subConditions", profilesToRemoveSubConditions); + + PartialList<Profile> profilesToRemove = persistenceService.query(profilesToRemoveCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m"); + PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m"); + + while (profilesToAdd.getList().size() > 0) { + for (Profile profileToAdd : profilesToAdd.getList()) { + profileToAdd.getSegments().add(segment.getItemId()); + persistenceService.update(profileToAdd.getItemId(), null, Profile.class, "segments", profileToAdd.getSegments()); + Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date()); + profileUpdated.setPersistent(false); + eventService.send(profileUpdated); + updatedProfileCount++; } - } - Set<Profile> profilesToRemove = new HashSet<>(previousProfilesSet.size() / 2); - for (Profile previousProfile : previousProfilesSet) { - if (!newProfilesSet.contains(previousProfile)) { - profilesToRemove.add(previousProfile); + profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity()); + if (profilesToAdd == null || profilesToAdd.getList().size() == 0) { + break; } } - - - for (Profile profileToAdd : profilesToAdd) { - profileToAdd.getSegments().add(segment.getItemId()); - persistenceService.update(profileToAdd.getItemId(), null, Profile.class, "segments", profileToAdd.getSegments()); - Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date()); - profileUpdated.setPersistent(false); - eventService.send(profileUpdated); - } - for (Profile profileToRemove : profilesToRemove) { - profileToRemove.getSegments().remove(segment.getItemId()); - persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments()); - Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date()); - profileUpdated.setPersistent(false); - eventService.send(profileUpdated); + while (profilesToRemove.getList().size() > 0) { + for (Profile profileToRemove : profilesToRemove.getList()) { + profileToRemove.getSegments().remove(segment.getItemId()); + persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments()); + Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date()); + profileUpdated.setPersistent(false); + eventService.send(profileUpdated); + updatedProfileCount++; + } + profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity()); + if (profilesToRemove == null || profilesToRemove.getList().size() == 0) { + break; + } } } else { - List<Profile> previousProfiles = persistenceService.query(segmentCondition, null, Profile.class); - for (Profile profileToRemove : previousProfiles) { - profileToRemove.getSegments().remove(segment.getItemId()); - persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments()); + PartialList<Profile> profilesToRemove = persistenceService.query(segmentCondition, null, Profile.class, 0, 200, "10m"); + while (profilesToRemove.getList().size() > 0) { + for (Profile profileToRemove : profilesToRemove.getList()) { + profileToRemove.getSegments().remove(segment.getItemId()); + persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments()); + updatedProfileCount++; + } + profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity()); + if (profilesToRemove == null || profilesToRemove.getList().size() == 0) { + break; + } } } - logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t); + logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis()-t); } private void updateExistingProfilesForScoring(Scoring scoring) { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/28d7dbde/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 5564e94..3176639 100644 --- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -29,6 +29,7 @@ <cm:property name="profile.purge.inactiveTime" value="30"/> <cm:property name="profile.purge.existTime" value="-1"/> <cm:property name="event.purge.existTime" value="12"/> + <cm:property name="segment.update.batchSize" value="1000"/> </cm:default-properties> </cm:property-placeholder> @@ -104,6 +105,7 @@ <property name="rulesService" ref="rulesServiceImpl"/> <property name="bundleContext" ref="blueprintBundleContext"/> <property name="taskExecutionPeriod" value="86400000"/> + <property name="segmentUpdateBatchSize" value="${services.segment.update.batchSize}" /> </bean> <service id="segmentService" ref="segmentServiceImpl" auto-export="interfaces"/>
