This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new a987d4b  Expose refresh policy (#280)
a987d4b is described below

commit a987d4b81d77ed3c4810f22011feae0da241022a
Author: liatiusim <[email protected]>
AuthorDate: Wed Apr 21 13:51:12 2021 +0300

    Expose refresh policy (#280)
---
 .../main/resources/etc/custom.system.properties    |  4 ++++
 .../ElasticSearchPersistenceServiceImpl.java       | 25 +++++++++++++++++++++-
 .../resources/OSGI-INF/blueprint/blueprint.xml     |  3 +++
 .../org.apache.unomi.persistence.elasticsearch.cfg |  5 +++++
 .../unomi/persistence/spi/PersistenceService.java  |  8 +++++++
 .../actions/SetEventOccurenceCountAction.java      |  6 ++++--
 6 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/package/src/main/resources/etc/custom.system.properties 
b/package/src/main/resources/etc/custom.system.properties
index 668ec9f..92a0113 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -100,6 +100,10 @@ 
org.apache.unomi.elasticsearch.cluster.name=${env:UNOMI_ELASTICSEARCH_CLUSTERNAM
 # hostA:9200,hostB:9200
 # Note: the port number must be repeated for each host.
 
org.apache.unomi.elasticsearch.addresses=${env:UNOMI_ELASTICSEARCH_ADDRESSES:-localhost:9200}
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is 
NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy=${env:UNOMI_ELASTICSEARCH_REFRESH_POLICY_PER_ITEM_TYPE:-}
 
org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-}
 
org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context}
 
org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5}
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 6ee5be0..eac14c7 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -17,13 +17,14 @@
 
 package org.apache.unomi.persistence.elasticsearch;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.hazelcast.core.HazelcastInstance;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.lucene.search.TotalHits;
@@ -167,6 +168,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
@@ -242,6 +244,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
     private boolean aggQueryThrowOnMissingDocs = false;
     private Integer aggQueryMaxResponseSizeHttp = null;
     private Integer clientSocketTimeout = null;
+    private Map<String, WriteRequest.RefreshPolicy> itemTypeToRefreshPolicy = 
new HashMap<>();
 
     private Map<String, Map<String, Map<String, Object>>> knownMappings = new 
HashMap<>();
 
@@ -262,6 +265,13 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         }
     }
 
+    public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy) 
throws IOException {
+        if (!itemTypeToRefreshPolicy.isEmpty()) {
+            this.itemTypeToRefreshPolicy = new 
ObjectMapper().readValue(itemTypeToRefreshPolicy,
+                        new TypeReference<HashMap<String, 
WriteRequest.RefreshPolicy>>() {});
+        }
+    }
+
     public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
         this.fatalIllegalStateErrors = 
Arrays.stream(fatalIllegalStateErrors.split(","))
                 .map(i -> i.trim()).filter(i -> 
!i.isEmpty()).toArray(String[]::new);
@@ -842,6 +852,11 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
     }
 
     @Override
+    public boolean isConsistent(Item item) {
+        return getRefreshPolicy(item.getItemType()) != 
WriteRequest.RefreshPolicy.NONE;
+    }
+
+    @Override
     public boolean save(final Item item) {
         return save(item, useBatchingForSave, alwaysOverwrite);
     }
@@ -887,6 +902,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
 
                     try {
                         if (bulkProcessor == null || !useBatching) {
+                            
indexRequest.setRefreshPolicy(getRefreshPolicy(item.getItemType()));
                             IndexResponse response = 
client.index(indexRequest, RequestOptions.DEFAULT);
                             setMetadata(item, response.getId(), 
response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
                         } else {
@@ -2403,4 +2419,11 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         return INDEX_DATE_PREFIX + d;
     }
 
+    private WriteRequest.RefreshPolicy getRefreshPolicy(String itemType) {
+        if (itemTypeToRefreshPolicy.containsKey(itemType)) {
+            return itemTypeToRefreshPolicy.get(itemType);
+        }
+        return WriteRequest.RefreshPolicy.NONE;
+    }
+
 }
diff --git 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5c2d12c..62c3cdf 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -56,6 +56,7 @@
             <cm:property name="clientSocketTimeout" value="" />
             <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
             <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
+            <cm:property name="itemTypeToRefreshPolicy" value="" />
             <cm:property name="itemClassesToCache" value="" />
             <cm:property name="useBatchingForSave" value="false" />
             <cm:property name="useBatchingForUpdate" value="true" />
@@ -129,6 +130,8 @@
         <property name="aggregateQueryBucketSize" 
value="${es.aggregateQueryBucketSize}" />
         <property name="aggQueryMaxResponseSizeHttp" 
value="${es.aggQueryMaxResponseSizeHttp}" />
         <property name="aggQueryThrowOnMissingDocs" 
value="${es.aggQueryThrowOnMissingDocs}" />
+        <property name="itemTypeToRefreshPolicy" 
value="${es.itemTypeToRefreshPolicy}" />
+
         <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" 
/>
 
         <property name="metricsService" ref="metricsService" />
diff --git 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ce2fb67..71c2577 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -60,6 +60,11 @@ 
pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePa
 # max socket timeout in millis
 clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
 
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is 
NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+itemTypeToRefreshPolicy=${org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy:-}
+
 # Retrun error in docs are missing in es aggregation calculation
 
aggQueryThrowOnMissingDocs=${org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs:-false}
 
diff --git 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 6fde7ee..36a4b50 100644
--- 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -111,6 +111,14 @@ public interface PersistenceService {
     Object getSetting(String fieldName) throws NoSuchFieldException, 
IllegalAccessException;
 
     /**
+     * Return true if the item which is saved in the persistence service is 
consistent
+     *
+     * @param item the item to the check if consistent
+     * @return {@code true} if the item is consistent, false otherwise
+     */
+    boolean isConsistent(Item item);
+
+    /**
      * Persists the specified Item in the context server.
      *
      * @param item the item to persist
diff --git 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
index cc04165..19b1afa 100644
--- 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
+++ 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
@@ -117,8 +117,10 @@ public class SetEventOccurenceCountAction implements 
ActionExecutor {
 
         LocalDateTime eventTime = 
LocalDateTime.ofInstant(event.getTimeStamp().toInstant(),ZoneId.of("UTC"));
 
-        if (inTimeRange(eventTime, numberOfDays, fromDateTime, toDateTime)) {
-            count++;
+        if (!persistenceService.isConsistent(event)) {
+            if (inTimeRange(eventTime, numberOfDays, fromDateTime, 
toDateTime)) {
+                count++;
+            }
         }
 
         pastEvents.put((String) 
pastEventCondition.getParameter("generatedPropertyKey"), count);

Reply via email to