This is an automated email from the ASF dual-hosted git repository. shuber pushed a commit to branch unomi-1.5.x in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 2f8d397f240cd29023ebce29293b82cd477709e0 Author: liatiusim <[email protected]> AuthorDate: Tue Nov 10 10:51:05 2020 +0100 Add optimizations for past event queries (#208) (cherry picked from commit 628fb77ec77bfff8b82997db124e73899ee49fd8) --- .../main/resources/etc/custom.system.properties | 4 ++ .../ElasticSearchPersistenceServiceImpl.java | 81 +++++++++++++++++++--- .../resources/OSGI-INF/blueprint/blueprint.xml | 7 +- .../org.apache.unomi.persistence.elasticsearch.cfg | 11 +++ .../unomi/persistence/spi/PersistenceService.java | 12 ++++ .../PastEventConditionESQueryBuilder.java | 78 +++++++++++++-------- .../resources/OSGI-INF/blueprint/blueprint.xml | 2 + .../services/impl/segments/SegmentServiceImpl.java | 63 +++++++++++------ .../resources/OSGI-INF/blueprint/blueprint.xml | 2 + 9 files changed, 202 insertions(+), 58 deletions(-) diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties index f183969..a7b665c 100644 --- a/package/src/main/resources/etc/custom.system.properties +++ b/package/src/main/resources/etc/custom.system.properties @@ -90,6 +90,10 @@ org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10} org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000} org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000} +org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-} +org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false} +org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false} +org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-} # The following settings control the behavior of the BulkProcessor API. You can find more information about these # settings and their behavior here : https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.4/java-docs-bulk-processor.html # The values used here are the default values of the API diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 530395b..0bd2f13 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -23,6 +23,7 @@ import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.RequestConfig; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.lucene.search.TotalHits; @@ -55,7 +56,13 @@ import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.*; +import org.elasticsearch.client.Node; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.core.MainResponse; @@ -67,10 +74,7 @@ import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.query.IdsQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.*; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; @@ -91,6 +95,7 @@ import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBu import org.elasticsearch.search.aggregations.bucket.range.IpRangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -182,6 +187,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Set<String> itemClassesToCacheSet = new HashSet<>(); private String itemClassesToCache; private boolean useBatchingForSave = false; + private boolean aggQueryThrowOnMissingDocs = false; + private Integer aggQueryMaxResponseSizeHttp = null; + private Integer clientSocketTimeout = null; + private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>(); @@ -207,6 +216,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .map(i -> i.trim()).filter(i -> !i.isEmpty()).toArray(String[]::new); } + public void setAggQueryMaxResponseSizeHttp(String aggQueryMaxResponseSizeHttp) { + if (StringUtils.isNumeric(aggQueryMaxResponseSizeHttp)) { + this.aggQueryMaxResponseSizeHttp = Integer.parseInt(aggQueryMaxResponseSizeHttp); + } + } + public void setIndexPrefix(String indexPrefix) { this.indexPrefix = indexPrefix; } @@ -295,6 +310,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.aggregateQueryBucketSize = aggregateQueryBucketSize; } + public void setClientSocketTimeout(String clientSocketTimeout) { + if (StringUtils.isNumeric(clientSocketTimeout)) { + this.clientSocketTimeout = Integer.parseInt(clientSocketTimeout); + } + } + public void setMetricsService(MetricsService metricsService) { this.metricsService = metricsService; } @@ -336,6 +357,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.sslTrustAllCertificates = sslTrustAllCertificates; } + + public void setAggQueryThrowOnMissingDocs(boolean aggQueryThrowOnMissingDocs) { + this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs; + } public void start() throws Exception { // on startup @@ -416,6 +441,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, RestClientBuilder clientBuilder = RestClient.builder(nodeList.toArray(new Node[nodeList.size()])); + if (clientSocketTimeout != null) { + clientBuilder.setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setSocketTimeout(clientSocketTimeout); + return requestConfigBuilder; + }); + } + clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { if (sslTrustAllCertificates) { try { @@ -1466,6 +1498,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, searchSourceBuilder.version(true); searchRequest.source(searchSourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); + if (size == -1) { // Scroll until no more hits are returned while (true) { @@ -1576,16 +1609,21 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Deprecated @Override public Map<String, Long> aggregateQuery(Condition filter, BaseAggregate aggregate, String itemType) { - return aggregateQuery(filter, aggregate, itemType, false); + return aggregateQuery(filter, aggregate, itemType, false, aggregateQueryBucketSize); } @Override public Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType) { - return aggregateQuery(filter, aggregate, itemType, true); + return aggregateQuery(filter, aggregate, itemType, true, aggregateQueryBucketSize); + } + + @Override + public Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType, int size) { + return aggregateQuery(filter, aggregate, itemType, true, size); } private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType, - final boolean optimizedQuery) { + final boolean optimizedQuery, int queryBucketSize) { return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery", this.bundleContext, this.fatalIllegalStateErrors) { @Override @@ -1647,7 +1685,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, fieldName = getPropertyNameWithData(fieldName, itemType); //default if (fieldName != null) { - bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(aggregateQueryBucketSize); + bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(queryBucketSize); if (aggregate instanceof TermsAggregate) { TermsAggregate termsAggregate = (TermsAggregate) aggregate; if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) { @@ -1696,9 +1734,21 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } searchRequest.source(searchSourceBuilder); - SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); + + RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); + + if (aggQueryMaxResponseSizeHttp != null) { + builder.setHttpAsyncResponseConsumerFactory( + new HttpAsyncResponseConsumerFactory + .HeapBufferedResponseConsumerFactory(aggQueryMaxResponseSizeHttp)); + } + + SearchResponse response = client.search(searchRequest, builder.build()); Aggregations aggregations = response.getAggregations(); + + if (aggregations != null) { + if (optimizedQuery) { if (response.getHits() != null) { results.put("_filtered", response.getHits().getTotalHits().value); @@ -1715,6 +1765,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } if (aggregations.get("buckets") != null) { + + if (aggQueryThrowOnMissingDocs) { + if (aggregations.get("buckets") instanceof Terms) { + Terms terms = aggregations.get("buckets"); + if (terms.getDocCountError() > 0 || terms.getSumOfOtherDocCounts() > 0) { + throw new UnsupportedOperationException("Some docs are missing in aggregation query. docCountError is:" + + terms.getDocCountError() + " sumOfOtherDocCounts:" + terms.getSumOfOtherDocCounts()); + } + } + } + long totalDocCount = 0; MultiBucketsAggregation terms = aggregations.get("buckets"); for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) { diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 5888c26..48402f3 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -53,7 +53,9 @@ <cm:property name="maximalElasticSearchVersion" value="8.0.0" /> <cm:property name="aggregateQueryBucketSize" value="5000" /> - + <cm:property name="clientSocketTimeout" value="" /> + <cm:property name="aggQueryMaxResponseSizeHttp" value="" /> + <cm:property name="aggQueryThrowOnMissingDocs" value="false" /> <cm:property name="itemClassesToCache" value="" /> <cm:property name="useBatchingForSave" value="false" /> @@ -121,6 +123,9 @@ <property name="maximalElasticSearchVersion" value="${es.maximalElasticSearchVersion}" /> <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" /> + <property name="aggQueryMaxResponseSizeHttp" value="${es.aggQueryMaxResponseSizeHttp}" /> + <property name="aggQueryThrowOnMissingDocs" value="${es.aggQueryThrowOnMissingDocs}" /> + <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" /> <property name="metricsService" ref="metricsService" /> <property name="hazelcastInstance" ref="hazelcastInstance" /> diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index c6205ed..c0e7b46 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -55,6 +55,17 @@ aggregateQueryBucketSize=${org.apache.unomi.elasticsearch.aggregateQueryBucketSi # Maximum size allowed for an elastic "ids" query maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000} +# Disable partitions on aggregation queries for past events. +pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false} + +# max socket timeout in millis +clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-} + +# Retrun error in docs are missing in es aggregation calculation +aggQueryThrowOnMissingDocs=${org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs:-false} + +aggQueryMaxResponseSizeHttp=${org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp:-} + # Authentication username=${org.apache.unomi.elasticsearch.username:-} password=${org.apache.unomi.elasticsearch.password:-} diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index 0e41df3..ce7d28e 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -458,6 +458,18 @@ public interface PersistenceService { Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType); /** + * Retrieves the number of items with the specified type as defined by the Item subclass public field {@code ITEM_TYPE} matching the optional specified condition and + * aggregated according to the specified {@link BaseAggregate}. + * + * @param filter the condition the items must match or {@code null} if no filtering is needed + * @param aggregate an aggregate specifying how matching items must be bundled + * @param itemType the String representation of the item type we want to retrieve the count of, as defined by its class' {@code ITEM_TYPE} field + * @param size size of returned buckets in the response + * @return a Map associating aggregation dimension name as key and cardinality for that dimension as value + */ + Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType, int size); + + /** * Updates the persistence's engine indices if needed. */ void refresh(); diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java index 6afffb1..3c676d4 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import java.util.*; +import java.util.stream.Collectors; public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder { @@ -43,6 +44,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder private int maximumIdsQueryCount = 5000; private int aggregateQueryBucketSize = 5000; + private boolean pastEventsDisablePartitions = false; public void setDefinitionsService(DefinitionsService definitionsService) { this.definitionsService = definitionsService; @@ -64,6 +66,10 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder this.aggregateQueryBucketSize = aggregateQueryBucketSize; } + public void setPastEventsDisablePartitions(boolean pastEventsDisablePartitions) { + this.pastEventsDisablePartitions = pastEventsDisablePartitions; + } + public void setSegmentService(SegmentService segmentService) { this.segmentService = segmentService; } @@ -95,25 +101,34 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder Set<String> ids = new HashSet<>(); - // Get full cardinality to partition the terms aggreggation - Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); - long card = m.get("_card").longValue(); - - int numParts = (int) (card / aggregateQueryBucketSize) + 2; - for (int i = 0; i < numParts; i++) { - Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); - if (eventCountByProfile != null) { - eventCountByProfile.remove("_filtered"); - for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { - if (entry.getValue() < minimumEventCount) { - // No more interesting buckets in this partition - break; - } else if (entry.getValue() <= maximumEventCount) { - ids.add(entry.getKey()); - - if (ids.size() > maximumIdsQueryCount) { - // Avoid building too big ids query - throw exception instead - throw new UnsupportedOperationException("Too many profiles"); + if (pastEventsDisablePartitions) { + Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount); + ids = eventCountByProfile.entrySet().stream() + .filter(x -> !x.getKey().equals("_filtered")) + .filter(x -> x.getValue() >= minimumEventCount && x.getValue() <= maximumEventCount) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } else { + // Get full cardinality to partition the terms aggreggation + Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); + long card = m.get("_card").longValue(); + + int numParts = (int) (card / aggregateQueryBucketSize) + 2; + for (int i = 0; i < numParts; i++) { + Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); + if (eventCountByProfile != null) { + eventCountByProfile.remove("_filtered"); + for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { + if (entry.getValue() < minimumEventCount) { + // No more interesting buckets in this partition + break; + } else if (entry.getValue() <= maximumEventCount) { + ids.add(entry.getKey()); + + if (ids.size() > maximumIdsQueryCount) { + // Avoid building too big ids query - throw exception instead + throw new UnsupportedOperationException("Too many profiles"); + } } } } @@ -126,16 +141,28 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder public long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) { Condition eventCondition = getEventCondition(condition, context); + Map<String, Double> aggResult = null; Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 1 : (Integer) condition.getParameter("minimumEventCount"); Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount"); - // Get full cardinality to partition the terms aggreggation - Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); - long card = m.get("_card").longValue(); + // No count filter - simply get the full number of distinct profiles + if (minimumEventCount == 1 && maximumEventCount == Integer.MAX_VALUE) { + aggResult = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); + return aggResult.get("_card").longValue(); + } - if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) { - // Event count specified, must check occurences count for each profile + if (pastEventsDisablePartitions) { + Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount); + return eventCountByProfile.entrySet().stream() + .filter(x -> x.getKey().equals("_filtered")) + .filter(x -> x.getValue() >= minimumEventCount && x.getValue() <= maximumEventCount) + .count(); + } else { + // Get full cardinality to partition the terms aggreggation + aggResult = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); + long card = aggResult.get("_card").longValue(); + // Event count specified, must check occurences count for each profile int result = 0; int numParts = (int) (card / aggregateQueryBucketSize) + 2; for (int i = 0; i < numParts; i++) { @@ -159,9 +186,6 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder } } return result; - } else { - // Simply get the full number of distinct profiles - return card; } } diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml index cd75ce1..9f7169a 100644 --- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -34,6 +34,7 @@ <cm:default-properties> <cm:property name="maximumIdsQueryCount" value="5000"/> <cm:property name="aggregateQueryBucketSize" value="5000"/> + <cm:property name="pastEventsDisablePartitions" value="false"/> </cm:default-properties> </cm:property-placeholder> @@ -105,6 +106,7 @@ <property name="segmentService" ref="segmentService"/> <property name="scriptExecutor" ref="scriptExecutor" /> <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/> + <property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/> <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/> </bean> </service> diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index 03400f1..36a2a67 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -66,6 +66,9 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe private long segmentRefreshInterval = 1000; private int aggregateQueryBucketSize = 5000; + private int maximumIdsQueryCount = 5000; + private boolean pastEventsDisablePartitions = false; + public SegmentServiceImpl() { logger.info("Initializing segment service..."); } @@ -94,6 +97,14 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe this.aggregateQueryBucketSize = aggregateQueryBucketSize; } + public void setMaximumIdsQueryCount(int maximumIdsQueryCount) { + this.maximumIdsQueryCount = maximumIdsQueryCount; + } + + public void setPastEventsDisablePartitions(boolean pastEventsDisablePartitions) { + this.pastEventsDisablePartitions = pastEventsDisablePartitions; + } + public void setSegmentRefreshInterval(long segmentRefreshInterval) { this.segmentRefreshInterval = segmentRefreshInterval; } @@ -797,28 +808,19 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe endDateCondition.setParameter("propertyValueDate", toDate); l.add(endDateCondition); } - String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey"); - Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); - long card = m.get("_card").longValue(); + String propertyKey = (String) parentCondition.getParameter("generatedPropertyKey"); - int numParts = (int) (card / aggregateQueryBucketSize) + 2; - for (int i = 0; i < numParts; i++) { - Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); - for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { - String profileId = entry.getKey(); - if (!profileId.startsWith("_")) { - Map<String, Long> pastEventCounts = new HashMap<>(); - pastEventCounts.put(propertyKey, entry.getValue()); - Map<String, Object> systemProperties = new HashMap<>(); - systemProperties.put("pastEvents", pastEventCounts); - try { - systemProperties.put("lastUpdated", new Date()); - persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties); - } catch (Exception e) { - logger.error("Error updating profile {} past event system properties", profileId, e); - } - } + if(pastEventsDisablePartitions) { + Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE, maximumIdsQueryCount); + updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey); + } else { + Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE); + long card = m.get("_card").longValue(); + int numParts = (int) (card / aggregateQueryBucketSize) + 2; + for (int i = 0; i < numParts; i++) { + Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); + updateProfilesWithPastEventProperty(eventCountByProfile, propertyKey); } } @@ -848,6 +850,27 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe } } + private void updateProfilesWithPastEventProperty(Map<String, Long> eventCountByProfile, String propertyKey) { + for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { + String profileId = entry.getKey(); + if (!profileId.startsWith("_")) { + Map<String, Long> pastEventCounts = new HashMap<>(); + pastEventCounts.put(propertyKey, entry.getValue()); + Map<String, Object> systemProperties = new HashMap<>(); + systemProperties.put("pastEvents", pastEventCounts); + try { + systemProperties.put("lastUpdated", new Date()); + Profile profile = new Profile(); + profile.setItemId(profileId); + persistenceService.update(profile.getItemId(), null, Profile.class, "systemProperties", systemProperties); + } catch (Exception e) { + logger.error("Error updating profile {} past event system properties", profileId, e); + } + } + } + + } + private String getMD5(String md5) { try { MessageDigest md = MessageDigest.getInstance("MD5"); diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml index b2cf0c0..3e7b0eb 100644 --- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -172,6 +172,8 @@ <property name="schedulerService" ref="schedulerServiceImpl"/> <property name="segmentRefreshInterval" value="${services.segment.refresh.interval}"/> <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" /> + <property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/> + <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/> </bean> <service id="segmentService" ref="segmentServiceImpl"> <interfaces>
