This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch unomi-1.5.x
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/unomi-1.5.x by this push:
     new 8b0bffa  Expose refresh policy (#280)
8b0bffa is described below

commit 8b0bffa53bef73c3e6d81ec607ecaad54c45ea3b
Author: liatiusim <[email protected]>
AuthorDate: Wed Apr 21 13:51:12 2021 +0300

    Expose refresh policy (#280)
    
    (cherry picked from commit a987d4b81d77ed3c4810f22011feae0da241022a)
---
 .../main/resources/etc/custom.system.properties    |  4 ++++
 .../ElasticSearchPersistenceServiceImpl.java       | 24 ++++++++++++++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml     |  3 +++
 .../org.apache.unomi.persistence.elasticsearch.cfg |  5 +++++
 .../unomi/persistence/spi/PersistenceService.java  |  8 ++++++++
 .../actions/SetEventOccurenceCountAction.java      |  6 ++++--
 6 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/package/src/main/resources/etc/custom.system.properties 
b/package/src/main/resources/etc/custom.system.properties
index a09c922..49552fb 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -100,6 +100,10 @@ 
org.apache.unomi.elasticsearch.cluster.name=${env:UNOMI_ELASTICSEARCH_CLUSTERNAM
 # hostA:9200,hostB:9200
 # Note: the port number must be repeated for each host.
 
org.apache.unomi.elasticsearch.addresses=${env:UNOMI_ELASTICSEARCH_ADDRESSES:-localhost:9200}
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is 
NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy=${env:UNOMI_ELASTICSEARCH_REFRESH_POLICY_PER_ITEM_TYPE:-}
 
org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-}
 
org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context}
 
org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5}
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index dc68056..ccb4742 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.unomi.persistence.elasticsearch;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.hazelcast.core.HazelcastInstance;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
@@ -124,6 +126,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
@@ -199,6 +202,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
     private boolean aggQueryThrowOnMissingDocs = false;
     private Integer aggQueryMaxResponseSizeHttp = null;
     private Integer clientSocketTimeout = null;
+    private Map<String, WriteRequest.RefreshPolicy> itemTypeToRefreshPolicy = 
new HashMap<>();
 
     private Map<String, Map<String, Map<String, Object>>> knownMappings = new 
HashMap<>();
 
@@ -219,6 +223,13 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         }
     }
 
+    public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy) 
throws IOException {
+        if (!itemTypeToRefreshPolicy.isEmpty()) {
+            this.itemTypeToRefreshPolicy = new 
ObjectMapper().readValue(itemTypeToRefreshPolicy,
+                        new TypeReference<HashMap<String, 
WriteRequest.RefreshPolicy>>() {});
+        }
+    }
+
     public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
         this.fatalIllegalStateErrors = 
Arrays.stream(fatalIllegalStateErrors.split(","))
                 .map(i -> i.trim()).filter(i -> 
!i.isEmpty()).toArray(String[]::new);
@@ -799,6 +810,11 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
     }
 
     @Override
+    public boolean isConsistent(Item item) {
+        return getRefreshPolicy(item.getItemType()) != 
WriteRequest.RefreshPolicy.NONE;
+    }
+
+    @Override
     public boolean save(final Item item) {
         return save(item, useBatchingForSave, alwaysOverwrite);
     }
@@ -844,6 +860,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
 
                     try {
                         if (bulkProcessor == null || !useBatching) {
+                            
indexRequest.setRefreshPolicy(getRefreshPolicy(item.getItemType()));
                             IndexResponse response = 
client.index(indexRequest, RequestOptions.DEFAULT);
                             setMetadata(item, response.getId(), 
response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
                         } else {
@@ -2279,4 +2296,11 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         return INDEX_DATE_PREFIX + d;
     }
 
+    private WriteRequest.RefreshPolicy getRefreshPolicy(String itemType) {
+        if (itemTypeToRefreshPolicy.containsKey(itemType)) {
+            return itemTypeToRefreshPolicy.get(itemType);
+        }
+        return WriteRequest.RefreshPolicy.NONE;
+    }
+
 }
diff --git 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 8a52c5f..2ec7d2d 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -56,6 +56,7 @@
             <cm:property name="clientSocketTimeout" value="" />
             <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
             <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
+            <cm:property name="itemTypeToRefreshPolicy" value="" />
             <cm:property name="itemClassesToCache" value="" />
             <cm:property name="useBatchingForSave" value="false" />
             <cm:property name="useBatchingForUpdate" value="true" />
@@ -128,6 +129,8 @@
         <property name="aggregateQueryBucketSize" 
value="${es.aggregateQueryBucketSize}" />
         <property name="aggQueryMaxResponseSizeHttp" 
value="${es.aggQueryMaxResponseSizeHttp}" />
         <property name="aggQueryThrowOnMissingDocs" 
value="${es.aggQueryThrowOnMissingDocs}" />
+        <property name="itemTypeToRefreshPolicy" 
value="${es.itemTypeToRefreshPolicy}" />
+
         <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" 
/>
 
         <property name="metricsService" ref="metricsService" />
diff --git 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ce2fb67..71c2577 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -60,6 +60,11 @@ 
pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePa
 # max socket timeout in millis
 clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
 
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is 
NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+itemTypeToRefreshPolicy=${org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy:-}
+
 # Retrun error in docs are missing in es aggregation calculation
 
aggQueryThrowOnMissingDocs=${org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs:-false}
 
diff --git 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index bdfc7de..96d46de 100644
--- 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -110,6 +110,14 @@ public interface PersistenceService {
     Object getSetting(String fieldName) throws NoSuchFieldException, 
IllegalAccessException;
 
     /**
+     * Return true if the item which is saved in the persistence service is 
consistent
+     *
+     * @param item the item to the check if consistent
+     * @return {@code true} if the item is consistent, false otherwise
+     */
+    boolean isConsistent(Item item);
+
+    /**
      * Persists the specified Item in the context server.
      *
      * @param item the item to persist
diff --git 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
index cc04165..19b1afa 100644
--- 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
+++ 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
@@ -117,8 +117,10 @@ public class SetEventOccurenceCountAction implements 
ActionExecutor {
 
         LocalDateTime eventTime = 
LocalDateTime.ofInstant(event.getTimeStamp().toInstant(),ZoneId.of("UTC"));
 
-        if (inTimeRange(eventTime, numberOfDays, fromDateTime, toDateTime)) {
-            count++;
+        if (!persistenceService.isConsistent(event)) {
+            if (inTimeRange(eventTime, numberOfDays, fromDateTime, 
toDateTime)) {
+                count++;
+            }
         }
 
         pastEvents.put((String) 
pastEventCondition.getParameter("generatedPropertyKey"), count);

Reply via email to