This is an automated email from the ASF dual-hosted git repository. shuber pushed a commit to branch unomi-1.5.x in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 55c8bcdd930d52ef6bee412e3fa8264357af2b42 Author: giladw <[email protected]> AuthorDate: Tue Jan 5 14:23:55 2021 +0100 UNOMI-371 add optional support for optimistic concurrency control (if_seq_no) (#223) (cherry picked from commit 714a643efe0e0ccd66f14d4a8cff12545f7f94cd) --- api/src/main/java/org/apache/unomi/api/Item.java | 10 ++ .../apache/unomi/services/UserListServiceImpl.java | 2 +- .../unomi/privacy/internal/PrivacyServiceImpl.java | 2 +- .../itests/ProfileServiceWithoutOverwriteIT.java | 123 +++++++++++++++++++++ .../ElasticSearchPersistenceServiceImpl.java | 110 ++++++++++++++---- .../resources/OSGI-INF/blueprint/blueprint.xml | 4 + .../org.apache.unomi.persistence.elasticsearch.cfg | 8 +- .../unomi/persistence/spi/PersistenceService.java | 36 +++++- .../actions/MergeProfilesOnPropertyAction.java | 6 +- .../unomi/plugins/mail/actions/SendMailAction.java | 2 +- .../services/impl/events/EventServiceImpl.java | 2 +- .../services/impl/rules/RulesServiceImpl.java | 2 +- .../services/impl/segments/SegmentServiceImpl.java | 12 +- 13 files changed, 276 insertions(+), 43 deletions(-) diff --git a/api/src/main/java/org/apache/unomi/api/Item.java b/api/src/main/java/org/apache/unomi/api/Item.java index 72f0ae6..2d5ff71 100644 --- a/api/src/main/java/org/apache/unomi/api/Item.java +++ b/api/src/main/java/org/apache/unomi/api/Item.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -63,6 +64,7 @@ public abstract class Item implements Serializable { protected String itemType; protected String scope; protected Long version; + protected Map<String, Object> systemMetadata = new HashMap<>(); public Item() { this.itemType = getItemType(this.getClass()); @@ -140,4 +142,12 @@ public abstract class Item implements Serializable { public void setVersion(Long version) { this.version = version; } + + public Object getSystemMetadata(String key) { + return systemMetadata.get(key); + } + + public void setSystemMetadata(String key, Object value) { + systemMetadata.put(key, value); + } } diff --git a/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java b/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java index dc3bbc8..37ca72e 100644 --- a/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java +++ b/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java @@ -94,7 +94,7 @@ public class UserListServiceImpl implements UserListService { if(index != -1){ ((List) profileSystemProperties.get("lists")).remove(index); profileSystemProperties.put("lastUpdated", new Date()); - persistenceService.update(p.getItemId(), null, Profile.class, "systemProperties", profileSystemProperties); + persistenceService.update(p, null, Profile.class, "systemProperties", profileSystemProperties); } } } diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java index 1675e11..3d3d68d 100644 --- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java +++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java @@ -137,7 +137,7 @@ public class PrivacyServiceImpl implements PrivacyService { persistenceService.save(session); List<Event> events = eventService.searchEvents(session.getItemId(), new String[0], null, 0, -1, null).getList(); for (Event event : events) { - persistenceService.update(event.getItemId(), event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId()); + persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId()); } } diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java new file mode 100644 index 0000000..5c2ed5b --- /dev/null +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package org.apache.unomi.itests; + +import org.apache.unomi.api.Profile; +import org.apache.unomi.api.services.DefinitionsService; +import org.apache.unomi.api.services.ProfileService; +import org.apache.unomi.persistence.spi.PersistenceService; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.ops4j.pax.exam.Configuration; +import org.ops4j.pax.exam.Option; +import org.ops4j.pax.exam.junit.PaxExam; +import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; +import org.ops4j.pax.exam.spi.reactors.PerSuite; +import org.ops4j.pax.exam.util.Filter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.ops4j.pax.exam.CoreOptions.systemProperty; + +/** + * An integration test for the profile service + */ +@RunWith(PaxExam.class) +@ExamReactorStrategy(PerSuite.class) +public class ProfileServiceWithoutOverwriteIT extends BaseIT { + private final static Logger LOGGER = LoggerFactory.getLogger(ProfileServiceWithoutOverwriteIT.class); + + private final static String TEST_PROFILE_ID = "test-profile-id"; + + @Configuration + public Option[] config() throws InterruptedException { + List<Option> options = new ArrayList<>(); + options.addAll(Arrays.asList(super.config())); + options.add(systemProperty("org.apache.unomi.elasticsearch.throwExceptions").value("true")); + options.add(systemProperty("org.apache.unomi.elasticsearch.alwaysOverwrite").value("false")); + return options.toArray(new Option[0]); + } + + @Inject @Filter(timeout = 600000) + protected ProfileService profileService; + + @Inject + @Filter(timeout = 600000) + protected PersistenceService persistenceService; + + @Inject + @Filter(timeout = 600000) + protected DefinitionsService definitionsService; + + @Before + public void setUp() { + TestUtils.removeAllProfiles(definitionsService, persistenceService); + } + + private Profile setupWithoutOverwriteTests() { + Profile profile = new Profile(); + profile.setItemId(TEST_PROFILE_ID); + profile.setProperty("country", "test-country"); + profile.setProperty("state", "test-state"); + profileService.save(profile); + + return profile; + } + + @Test(expected = RuntimeException.class) + public void testSaveProfileWithoutOverwriteSameProfileThrowsException() { + Profile profile = setupWithoutOverwriteTests(); + profile.setProperty("country", "test2-country"); + profileService.save(profile); + } + + @Test + public void testSaveProfileWithoutOverwriteSavesAfterReload() throws InterruptedException { + Profile profile = setupWithoutOverwriteTests(); + String profileId = profile.getItemId(); + Thread.sleep(4000); + + Profile updatedProfile = profileService.load(profileId); + updatedProfile.setProperty("country", "test2-country"); + profileService.save(updatedProfile); + + Thread.sleep(4000); + + Profile profileWithNewCountry = profileService.load(profileId); + assertEquals(profileWithNewCountry.getProperty("country"), "test2-country"); + } + + @Test(expected = RuntimeException.class) + public void testSaveProfileWithoutOverwriteWrongSeqNoThrowsException() throws InterruptedException { + Profile profile = setupWithoutOverwriteTests(); + String profileId = profile.getItemId(); + + Thread.sleep(4000); + + Profile updatedProfile = profileService.load(profileId); + updatedProfile.setProperty("country", "test2-country"); + updatedProfile.setMetadata("seq_no", 1L); + profileService.save(updatedProfile); + } +} diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 62aaa81..5c4a251 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -23,7 +23,6 @@ import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.config.RequestConfig; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.lucene.search.TotalHits; @@ -40,6 +39,7 @@ import org.apache.unomi.persistence.elasticsearch.conditions.*; import org.apache.unomi.persistence.spi.PersistenceService; import org.apache.unomi.persistence.spi.aggregate.*; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; @@ -49,6 +49,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -63,6 +64,7 @@ import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.core.MainResponse; @@ -139,6 +141,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval"; public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy"; public static final String INDEX_DATE_PREFIX = "date-"; + public static final String SEQ_NO = "seq_no"; + public static final String PRIMARY_TERM = "primary_term"; + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); private RestHighLevelClient client; private BulkProcessor bulkProcessor; @@ -192,6 +197,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Integer aggQueryMaxResponseSizeHttp = null; private Integer clientSocketTimeout = null; + private boolean alwaysOverwrite = true; private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>(); @@ -362,6 +368,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public void setAggQueryThrowOnMissingDocs(boolean aggQueryThrowOnMissingDocs) { this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs; } + + public void setThrowExceptions(boolean throwExceptions) { + this.throwExceptions = throwExceptions; + } + public void setAlwaysOverwrite(boolean alwaysOverwrite) { + this.alwaysOverwrite = alwaysOverwrite; + } + + public void start() throws Exception { // on startup @@ -747,8 +762,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (response.isExists()) { String sourceAsString = response.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - value.setItemId(response.getId()); - value.setVersion(response.getVersion()); + setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); putInCache(itemId, value); return value; } else { @@ -772,13 +786,28 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } + private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm) { + item.setItemId(id); + item.setVersion(version); + item.setSystemMetadata(SEQ_NO, seqNo); + item.setSystemMetadata(PRIMARY_TERM, primaryTerm); + } + @Override public boolean save(final Item item) { - return save(item, useBatchingForSave); + return save(item, useBatchingForSave, alwaysOverwrite); } @Override public boolean save(final Item item, final boolean useBatching) { + return save(item, useBatching, alwaysOverwrite); + } + + @Override + public boolean save(final Item item, final Boolean useBatchingOption, final Boolean alwaysOverwriteOption) { + final boolean useBatching = useBatchingOption == null ? this.useBatchingForSave : useBatchingOption; + final boolean alwaysOverwrite = alwaysOverwriteOption == null ? this.alwaysOverwrite : alwaysOverwriteOption; + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { @@ -790,13 +819,28 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(itemId); indexRequest.source(source, XContentType.JSON); + + if (!alwaysOverwrite) { + Long seqNo = (Long)item.getSystemMetadata(SEQ_NO); + Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM); + + if (seqNo != null && primaryTerm != null) { + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); + } + else { + indexRequest.opType(DocWriteRequest.OpType.CREATE); + } + } + if (routingByType.containsKey(itemType)) { indexRequest.routing(routingByType.get(itemType)); } try { if (bulkProcessor == null || !useBatching) { - client.index(indexRequest, RequestOptions.DEFAULT); + IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT); + setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); } else { bulkProcessor.add(indexRequest); } @@ -819,26 +863,43 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } @Override - public boolean update(final String itemId, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) { - return update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue)); + public boolean update(final Item item, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) { + return update(item, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue)); } @Override - public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) { + public boolean update(final Item item, final Date dateHint, final Class clazz, final Map source) { + return update(item, dateHint, clazz, source, alwaysOverwrite); + } + + @Override + public boolean update(final Item item, final Date dateHint, final Class clazz, final Map source, final boolean alwaysOverwrite) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); - UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), itemId); + UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId()); updateRequest.doc(source); + + if (!alwaysOverwrite) { + Long seqNo = (Long)item.getSystemMetadata(SEQ_NO); + Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM); + + if (seqNo != null && primaryTerm != null) { + updateRequest.setIfSeqNo(seqNo); + updateRequest.setIfPrimaryTerm(primaryTerm); + } + } + if (bulkProcessor == null) { - client.update(updateRequest, RequestOptions.DEFAULT); + UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT); + setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); } else { bulkProcessor.add(updateRequest); } return true; } catch (IndexNotFoundException e) { - throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); + throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + item.getItemId(), e); } } }.catchingExecuteInClassLoader(true); @@ -907,7 +968,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } @Override - public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) { + public boolean updateWithScript(final Item item, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors) { protected Boolean execute(Object... args) throws Exception { try { @@ -917,17 +978,26 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams); - UpdateRequest updateRequest = new UpdateRequest(index, itemId); + UpdateRequest updateRequest = new UpdateRequest(index, item.getItemId()); + + Long seqNo = (Long)item.getSystemMetadata(SEQ_NO); + Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM); + + if (seqNo != null && primaryTerm != null) { + updateRequest.setIfSeqNo(seqNo); + updateRequest.setIfPrimaryTerm(primaryTerm); + } updateRequest.script(actualScript); if (bulkProcessor == null) { - client.update(updateRequest, RequestOptions.DEFAULT); + UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT); + setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm()); } else { bulkProcessor.add(updateRequest); } return true; } catch (IndexNotFoundException e) { - throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); + throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + item.getItemId(), e); } } }.catchingExecuteInClassLoader(true); @@ -1471,6 +1541,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .fetchSource(true) + .seqNoAndPrimaryTerm(true) .query(query) .size(size < 0 ? defaultQueryLimit : size) .from(offset); @@ -1528,8 +1599,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // add hit to results String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - value.setItemId(searchHit.getId()); - value.setVersion(searchHit.getVersion()); + setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm()); results.add(value); } @@ -1559,8 +1629,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (SearchHit searchHit : searchHits) { String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - value.setItemId(searchHit.getId()); - value.setVersion(searchHit.getVersion()); + setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm()); results.add(value); } } @@ -1606,8 +1675,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // add hit to results String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); - value.setItemId(searchHit.getId()); - value.setVersion(searchHit.getVersion()); + setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm()); results.add(value); } } diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 48402f3..6f40799 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -63,6 +63,8 @@ <cm:property name="password" value="" /> <cm:property name="sslEnable" value="false" /> <cm:property name="sslTrustAllCertificates" value="false" /> + <cm:property name="throwExceptions" value="false" /> + <cm:property name="alwaysOverwrite" value="true" /> </cm:default-properties> </cm:property-placeholder> @@ -136,6 +138,8 @@ <property name="password" value="${es.password}" /> <property name="sslEnable" value="${es.sslEnable}" /> <property name="sslTrustAllCertificates" value="${es.sslTrustAllCertificates}" /> + <property name="throwExceptions" value="${es.throwExceptions}" /> + <property name="alwaysOverwrite" value="${es.alwaysOverwrite}" /> </bean> <!-- We use a listener here because using the list directly for listening to proxies coming from the same bundle didn't seem to work --> diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index c0e7b46..ac30c91 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -26,7 +26,6 @@ monthlyIndex.numberOfShards=${org.apache.unomi.elasticsearch.monthlyIndex.nbShar monthlyIndex.numberOfReplicas=${org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas:-0} monthlyIndex.indexMappingTotalFieldsLimit=${org.apache.unomi.elasticsearch.monthlyIndex.indexMappingTotalFieldsLimit:-1000} monthlyIndex.indexMaxDocValueFieldsSearch=${org.apache.unomi.elasticsearch.monthlyIndex.indexMaxDocValueFieldsSearch:-1000} -monthlyIndex.itemsMonthlyIndexedOverride=${org.apache.unomi.elasticsearch.monthlyIndex.itemsMonthlyIndexedOverride:-event,session} numberOfShards=${org.apache.unomi.elasticsearch.defaultIndex.nbShards:-5} numberOfReplicas=${org.apache.unomi.elasticsearch.defaultIndex.nbReplicas:-0} indexMappingTotalFieldsLimit=${org.apache.unomi.elasticsearch.defaultIndex.indexMappingTotalFieldsLimit:-1000} @@ -70,4 +69,9 @@ aggQueryMaxResponseSizeHttp=${org.apache.unomi.elasticsearch.aggQueryMaxResponse username=${org.apache.unomi.elasticsearch.username:-} password=${org.apache.unomi.elasticsearch.password:-} sslEnable=${org.apache.unomi.elasticsearch.sslEnable:-false} -sslTrustAllCertificates=${org.apache.unomi.elasticsearch.sslTrustAllCertificates:-false} \ No newline at end of file +sslTrustAllCertificates=${org.apache.unomi.elasticsearch.sslTrustAllCertificates:-false} + +# Errors +throwExceptions=${org.apache.unomi.elasticsearch.throwExceptions:-false} + +alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true} \ No newline at end of file diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index d85059e..f92d3bb 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -127,41 +127,65 @@ public interface PersistenceService { boolean save(Item item, boolean useBatching); /** + * Persists the specified Item in the context server. + * + * @param item the item to persist + * @param useBatching whether to use batching or not for saving the item. If activating there may be a delay between + * the call to this method and the actual saving in the persistence backend + * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving + * + * @return {@code true} if the item was properly persisted, {@code false} otherwise + */ + boolean save(Item item, Boolean useBatching, Boolean alwaysOverwrite); + + /** * Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map. * - * @param itemId the identifier of the item we want to update + * @param item the item we want to update * @param dateHint a Date helping in identifying where the item is located * @param clazz the Item subclass of the item to update * @param source a Map with entries specifying as key the property name to update and as value its new value * @return {@code true} if the update was successful, {@code false} otherwise */ - boolean update(String itemId, Date dateHint, Class<?> clazz, Map<?, ?> source); + boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source); /** * Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))} * - * @param itemId the identifier of the item we want to update + * @param item the item we want to update * @param dateHint a Date helping in identifying where the item is located * @param clazz the Item subclass of the item to update * @param propertyName the name of the property to update * @param propertyValue the new value of the property * @return {@code true} if the update was successful, {@code false} otherwise */ - boolean update(String itemId, Date dateHint, Class<?> clazz, String propertyName, Object propertyValue); + boolean update(Item item, Date dateHint, Class<?> clazz, String propertyName, Object propertyValue); + + /** + * Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map. + * + * @param item the item we want to update + * @param dateHint a Date helping in identifying where the item is located + * @param clazz the Item subclass of the item to update + * @param source a Map with entries specifying as key the property name to update and as value its new value + * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving + * @return {@code true} if the update was successful, {@code false} otherwise + */ + boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source, final boolean alwaysOverwrite); /** * Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))} * - * @param itemId the identifier of the item we want to update + * @param item the item we want to update * @param dateHint a Date helping in identifying where the item is located * @param clazz the Item subclass of the item to update * @param script inline script * @param scriptParams script params * @return {@code true} if the update was successful, {@code false} otherwise */ - boolean updateWithScript(String itemId, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams); + boolean updateWithScript(Item item, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams); /** * Updates the items of the specified class by a query with a new property value for the specified property name diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java index a496ddb..ffdf626 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java @@ -178,12 +178,12 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { } for (Session session : sessions) { - persistenceService.update(session.getItemId(), session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId); + persistenceService.update(session, session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId); } List<Event> events = persistenceService.query("profileId", profileId, null, Event.class); for (Event event : events) { - persistenceService.update(event.getItemId(), event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId); + persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId); } // we must mark all the profiles that we merged into the master as merged with the master, and they will // be deleted upon next load @@ -192,7 +192,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { sourceMap.put("mergedWith", masterProfileId); profile.setSystemProperty("lastUpdated", new Date()); sourceMap.put("systemProperties", profile.getSystemProperties()); - persistenceService.update(profile.getItemId(), null, Profile.class, sourceMap); + persistenceService.update(profile, null, Profile.class, sourceMap); } } } catch (Exception e) { diff --git a/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java b/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java index 8e03175..acdb1eb 100644 --- a/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java +++ b/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java @@ -116,7 +116,7 @@ public class SendMailAction implements ActionExecutor { event.getProfile().setSystemProperty("notificationAck", profileNotif); event.getProfile().setSystemProperty("lastUpdated", new Date()); - persistenceService.update(event.getProfile().getItemId(), null, Profile.class, "systemProperties", event.getProfile().getSystemProperties()); + persistenceService.update(event.getProfile(), null, Profile.class, "systemProperties", event.getProfile().getSystemProperties()); ST stringTemplate = new ST(template, '$', '$'); stringTemplate.add("profile", event.getProfile()); diff --git a/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java index 80b0449..25db0b5 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java @@ -152,7 +152,7 @@ public class EventServiceImpl implements EventService { boolean saveSucceeded = true; if (event.isPersistent()) { - saveSucceeded = persistenceService.save(event); + saveSucceeded = persistenceService.save(event, null, true); } int changes; diff --git a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java index d77173c..734d856 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java @@ -475,7 +475,7 @@ public class RulesServiceImpl implements RulesService, EventListenerService, Syn } allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics); if (mustPersist) { - persistenceService.save(ruleStatistics); + persistenceService.save(ruleStatistics, null, true); } } // now let's iterate over the rules coming from the persistence service, as we may have new ones. diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java index 36a2a67..e971432 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java @@ -365,7 +365,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe sourceMap.put("segments", profileToRemove.getSegments()); profileToRemove.setSystemProperty("lastUpdated", new Date()); sourceMap.put("systemProperties", profileToRemove.getSystemProperties()); - persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap); + persistenceService.update(profileToRemove, null, Profile.class, sourceMap); updatedProfileCount++; } logger.info("Removed segment from {} profiles in {} ms", updatedProfileCount, System.currentTimeMillis() - profileRemovalStartTime); @@ -724,7 +724,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe // todo remove profile properties ? persistenceService.remove(previousRule.getItemId(), Rule.class); } else { - persistenceService.update(previousRule.getItemId(), null, Rule.class, "linkedItems", previousRule.getLinkedItems()); + persistenceService.update(previousRule, null, Rule.class, "linkedItems", previousRule.getLinkedItems()); } } } @@ -862,7 +862,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe systemProperties.put("lastUpdated", new Date()); Profile profile = new Profile(); profile.setItemId(profileId); - persistenceService.update(profile.getItemId(), null, Profile.class, "systemProperties", systemProperties); + persistenceService.update(profile, null, Profile.class, "systemProperties", systemProperties); } catch (Exception e) { logger.error("Error updating profile {} past event system properties", profileId, e); } @@ -930,7 +930,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe sourceMap.put("segments", profileToAdd.getSegments()); profileToAdd.setSystemProperty("lastUpdated", new Date()); sourceMap.put("systemProperties", profileToAdd.getSystemProperties()); - persistenceService.update(profileToAdd.getItemId(), null, Profile.class, sourceMap); + persistenceService.update(profileToAdd, null, Profile.class, sourceMap); Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date()); profileUpdated.setPersistent(false); eventService.send(profileUpdated); @@ -950,7 +950,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe sourceMap.put("segments", profileToRemove.getSegments()); profileToRemove.setSystemProperty("lastUpdated", new Date()); sourceMap.put("systemProperties", profileToRemove.getSystemProperties()); - persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap); + persistenceService.update(profileToRemove, null, Profile.class, sourceMap); Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date()); profileUpdated.setPersistent(false); eventService.send(profileUpdated); @@ -973,7 +973,7 @@ public class SegmentServiceImpl extends AbstractServiceImpl implements SegmentSe sourceMap.put("segments", profileToRemove.getSegments()); profileToRemove.setSystemProperty("lastUpdated", new Date()); sourceMap.put("systemProperties", profileToRemove.getSystemProperties()); - persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap); + persistenceService.update(profileToRemove, null, Profile.class, sourceMap); Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date()); profileUpdated.setPersistent(false); eventService.send(profileUpdated);
