This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new a987d4b Expose refresh policy (#280)
a987d4b is described below
commit a987d4b81d77ed3c4810f22011feae0da241022a
Author: liatiusim <[email protected]>
AuthorDate: Wed Apr 21 13:51:12 2021 +0300
Expose refresh policy (#280)
---
.../main/resources/etc/custom.system.properties | 4 ++++
.../ElasticSearchPersistenceServiceImpl.java | 25 +++++++++++++++++++++-
.../resources/OSGI-INF/blueprint/blueprint.xml | 3 +++
.../org.apache.unomi.persistence.elasticsearch.cfg | 5 +++++
.../unomi/persistence/spi/PersistenceService.java | 8 +++++++
.../actions/SetEventOccurenceCountAction.java | 6 ++++--
6 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/package/src/main/resources/etc/custom.system.properties
b/package/src/main/resources/etc/custom.system.properties
index 668ec9f..92a0113 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -100,6 +100,10 @@
org.apache.unomi.elasticsearch.cluster.name=${env:UNOMI_ELASTICSEARCH_CLUSTERNAM
# hostA:9200,hostB:9200
# Note: the port number must be repeated for each host.
org.apache.unomi.elasticsearch.addresses=${env:UNOMI_ELASTICSEARCH_ADDRESSES:-localhost:9200}
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is
NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy=${env:UNOMI_ELASTICSEARCH_REFRESH_POLICY_PER_ITEM_TYPE:-}
org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-}
org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context}
org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5}
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 6ee5be0..eac14c7 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -17,13 +17,14 @@
package org.apache.unomi.persistence.elasticsearch;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.core.HazelcastInstance;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.lucene.search.TotalHits;
@@ -167,6 +168,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -242,6 +244,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
private boolean aggQueryThrowOnMissingDocs = false;
private Integer aggQueryMaxResponseSizeHttp = null;
private Integer clientSocketTimeout = null;
+ private Map<String, WriteRequest.RefreshPolicy> itemTypeToRefreshPolicy =
new HashMap<>();
private Map<String, Map<String, Map<String, Object>>> knownMappings = new
HashMap<>();
@@ -262,6 +265,13 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
+ public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy)
throws IOException {
+ if (!itemTypeToRefreshPolicy.isEmpty()) {
+ this.itemTypeToRefreshPolicy = new
ObjectMapper().readValue(itemTypeToRefreshPolicy,
+ new TypeReference<HashMap<String,
WriteRequest.RefreshPolicy>>() {});
+ }
+ }
+
public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
this.fatalIllegalStateErrors =
Arrays.stream(fatalIllegalStateErrors.split(","))
.map(i -> i.trim()).filter(i ->
!i.isEmpty()).toArray(String[]::new);
@@ -842,6 +852,11 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
@Override
+ public boolean isConsistent(Item item) {
+ return getRefreshPolicy(item.getItemType()) !=
WriteRequest.RefreshPolicy.NONE;
+ }
+
+ @Override
public boolean save(final Item item) {
return save(item, useBatchingForSave, alwaysOverwrite);
}
@@ -887,6 +902,7 @@ public class ElasticSearchPersistenceServiceImpl implements
PersistenceService,
try {
if (bulkProcessor == null || !useBatching) {
+
indexRequest.setRefreshPolicy(getRefreshPolicy(item.getItemType()));
IndexResponse response =
client.index(indexRequest, RequestOptions.DEFAULT);
setMetadata(item, response.getId(),
response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
@@ -2403,4 +2419,11 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
return INDEX_DATE_PREFIX + d;
}
+ private WriteRequest.RefreshPolicy getRefreshPolicy(String itemType) {
+ if (itemTypeToRefreshPolicy.containsKey(itemType)) {
+ return itemTypeToRefreshPolicy.get(itemType);
+ }
+ return WriteRequest.RefreshPolicy.NONE;
+ }
+
}
diff --git
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5c2d12c..62c3cdf 100644
---
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -56,6 +56,7 @@
<cm:property name="clientSocketTimeout" value="" />
<cm:property name="aggQueryMaxResponseSizeHttp" value="" />
<cm:property name="aggQueryThrowOnMissingDocs" value="false" />
+ <cm:property name="itemTypeToRefreshPolicy" value="" />
<cm:property name="itemClassesToCache" value="" />
<cm:property name="useBatchingForSave" value="false" />
<cm:property name="useBatchingForUpdate" value="true" />
@@ -129,6 +130,8 @@
<property name="aggregateQueryBucketSize"
value="${es.aggregateQueryBucketSize}" />
<property name="aggQueryMaxResponseSizeHttp"
value="${es.aggQueryMaxResponseSizeHttp}" />
<property name="aggQueryThrowOnMissingDocs"
value="${es.aggQueryThrowOnMissingDocs}" />
+ <property name="itemTypeToRefreshPolicy"
value="${es.itemTypeToRefreshPolicy}" />
+
<property name="clientSocketTimeout" value="${es.clientSocketTimeout}"
/>
<property name="metricsService" ref="metricsService" />
diff --git
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ce2fb67..71c2577 100644
---
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -60,6 +60,11 @@
pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePa
# max socket timeout in millis
clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is
NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+itemTypeToRefreshPolicy=${org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy:-}
+
# Retrun error in docs are missing in es aggregation calculation
aggQueryThrowOnMissingDocs=${org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs:-false}
diff --git
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 6fde7ee..36a4b50 100644
---
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -111,6 +111,14 @@ public interface PersistenceService {
Object getSetting(String fieldName) throws NoSuchFieldException,
IllegalAccessException;
/**
+ * Return true if the item which is saved in the persistence service is
consistent
+ *
+ * @param item the item to the check if consistent
+ * @return {@code true} if the item is consistent, false otherwise
+ */
+ boolean isConsistent(Item item);
+
+ /**
* Persists the specified Item in the context server.
*
* @param item the item to persist
diff --git
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
index cc04165..19b1afa 100644
---
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
+++
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
@@ -117,8 +117,10 @@ public class SetEventOccurenceCountAction implements
ActionExecutor {
LocalDateTime eventTime =
LocalDateTime.ofInstant(event.getTimeStamp().toInstant(),ZoneId.of("UTC"));
- if (inTimeRange(eventTime, numberOfDays, fromDateTime, toDateTime)) {
- count++;
+ if (!persistenceService.isConsistent(event)) {
+ if (inTimeRange(eventTime, numberOfDays, fromDateTime,
toDateTime)) {
+ count++;
+ }
}
pastEvents.put((String)
pastEventCondition.getParameter("generatedPropertyKey"), count);