This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch unomi-1.x in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-1.x by this push: new 1c2b78c0e UNOMI-784: task backport (#638) 1c2b78c0e is described below commit 1c2b78c0e7682e55ba11d37ac82e4e429f52b17b Author: kevan Jahanshahi <jke...@apache.org> AuthorDate: Thu Aug 3 18:05:59 2023 +0200 UNOMI-784: task backport (#638) --- .../test/java/org/apache/unomi/itests/BaseIT.java | 1 + .../org/apache/unomi/itests/ProfileServiceIT.java | 13 +- .../main/resources/etc/custom.system.properties | 15 +- .../ElasticSearchPersistenceServiceImpl.java | 190 +++++++++++---------- .../client/CustomRestHighLevelClient.java | 76 +++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 6 +- .../org.apache.unomi.persistence.elasticsearch.cfg | 18 +- .../unomi/persistence/spi/PersistenceService.java | 15 ++ .../actions/MergeProfilesOnPropertyAction.java | 3 +- 9 files changed, 239 insertions(+), 98 deletions(-) diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java index c8420a324..0ba3ac46f 100644 --- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java @@ -175,6 +175,7 @@ public abstract class BaseIT { editConfigurationFilePut("etc/org.apache.karaf.features.cfg", "serviceRequirements", "disable"), // editConfigurationFilePut("etc/org.ops4j.pax.web.cfg", "org.osgi.service.http.port", HTTP_PORT), // systemProperty("org.osgi.service.http.port").value(HTTP_PORT), + editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.taskWaitingPollingInterval", "50"), systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"), systemProperty("org.apache.unomi.itests.elasticsearch.transport.port").value("9500"), systemProperty("org.apache.unomi.itests.elasticsearch.cluster.name").value("contextElasticSearchITests"), diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java index fe82fa749..8264f5883 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java @@ -134,8 +134,17 @@ public class ProfileServiceIT extends BaseIT { public void testGetProfileWithWrongScrollerIdThrowException() throws InterruptedException, NoSuchFieldException, IllegalAccessException, IOException { boolean throwExceptionCurrent = false; Configuration elasticSearchConfiguration = configurationAdmin.getConfiguration("org.apache.unomi.persistence.elasticsearch"); - if (elasticSearchConfiguration != null) { - throwExceptionCurrent = Boolean.getBoolean((String) elasticSearchConfiguration.getProperties().get("throwExceptions")); + if (elasticSearchConfiguration != null && elasticSearchConfiguration.getProperties().get("throwExceptions") != null) { + try { + if (elasticSearchConfiguration.getProperties().get("throwExceptions") instanceof String) { + throwExceptionCurrent = Boolean.parseBoolean((String) elasticSearchConfiguration.getProperties().get("throwExceptions")); + } else { + // already a boolean + throwExceptionCurrent = (Boolean) elasticSearchConfiguration.getProperties().get("throwExceptions"); + } + } catch (Throwable e) { + // Not able to cast the property + } } updateConfiguration(PersistenceService.class.getName(), "org.apache.unomi.persistence.elasticsearch", "throwExceptions", true); diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties index 0628777f2..5c45700e1 100644 --- a/package/src/main/resources/etc/custom.system.properties +++ b/package/src/main/resources/etc/custom.system.properties @@ -118,7 +118,20 @@ org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10} org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000} org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000} -org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-} +# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the timeout for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets). +# A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default). +# Default: -1 (System default) +org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:--1} +# Defines the waiting for task completion timeout in milliseconds. +# Some operations like update_by_query and delete_by_query are delegated to ElasticSearch using tasks +# For consistency the thread that trigger one of those operations will wait for the task to be completed on ElasticSearch side. +# This timeout configuration is here to ensure not blocking the thread infinitely, in case of very long running tasks. +# A timeout value of zero or negative is interpreted as an infinite timeout. +# Default: 3600000 (one hour) +org.apache.unomi.elasticsearch.taskWaitingTimeout=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_TIMEOUT:-3600000} +# Defines the polling interval in milliseconds, which is used to check if task is completed on ElasticSearch side +# Default: 1000 (1 second) +org.apache.unomi.elasticsearch.taskWaitingPollingInterval=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_POLLING_INTERVAL:-1000} org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false} org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false} org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-} diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 6c0a0fa63..589bca1b1 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -62,27 +62,22 @@ import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Node; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.*; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.core.MainResponse; import org.elasticsearch.client.indices.*; +import org.elasticsearch.client.tasks.GetTaskRequest; +import org.elasticsearch.client.tasks.GetTaskResponse; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.*; import org.elasticsearch.index.reindex.*; @@ -111,6 +106,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskId; import org.osgi.framework.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,18 +126,7 @@ import java.security.SecureRandom; import java.security.cert.X509Certificate; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.query.QueryBuilders.termQuery; @@ -165,7 +150,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); private boolean throwExceptions = false; - private RestHighLevelClient client; + private CustomRestHighLevelClient client; private BulkProcessor bulkProcessor; private String elasticSearchAddresses; private List<String> elasticSearchAddressList = new ArrayList<>(); @@ -190,6 +175,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Integer defaultQueryLimit = 10; private Integer removeByQueryTimeoutInMinutes = 10; + private Integer taskWaitingTimeout = 3600000; + private Integer taskWaitingPollingInterval = 1000; private String itemsMonthlyIndexedOverride = "event,session"; private String bulkProcessorConcurrentRequests = "1"; @@ -394,6 +381,18 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.logLevelRestClient = logLevelRestClient; } + public void setTaskWaitingTimeout(String taskWaitingTimeout) { + if (StringUtils.isNumeric(taskWaitingTimeout)) { + this.taskWaitingTimeout = Integer.parseInt(taskWaitingTimeout); + } + } + + public void setTaskWaitingPollingInterval(String taskWaitingPollingInterval) { + if (StringUtils.isNumeric(taskWaitingPollingInterval)) { + this.taskWaitingPollingInterval = Integer.parseInt(taskWaitingPollingInterval); + } + } + public void start() throws Exception { // Work around to avoid ES Logs regarding the deprecated [ignore_throttled] parameter @@ -471,7 +470,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.info(this.getClass().getName() + " service started successfully."); } - private void buildClient() { + private void buildClient() throws NoSuchFieldException, IllegalAccessException { List<Node> nodeList = new ArrayList<>(); for (String elasticSearchAddress : elasticSearchAddressList) { String[] elasticSearchAddressParts = elasticSearchAddress.split(":"); @@ -526,7 +525,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, }); logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index prefix " + indexPrefix + "..."); - client = new RestHighLevelClient(clientBuilder); + client = new CustomRestHighLevelClient(clientBuilder); } public BulkProcessor getBulkProcessor() { @@ -989,59 +988,55 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (int i = 0; i < scripts.length; i++) { builtScripts[i] = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]); } - return updateWithQueryAndScript(dateHint, clazz, builtScripts, conditions); + + return updateWithQueryAndScript(dateHint, new Class<?>[]{clazz}, builtScripts, conditions, true); } @Override public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions) { + return updateWithQueryAndStoredScript(dateHint, new Class<?>[]{clazz}, scripts, scriptParams, conditions, true); + } + + @Override + public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?>[] classes, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean waitForComplete) { Script[] builtScripts = new Script[scripts.length]; for (int i = 0; i < scripts.length; i++) { builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i], scriptParams[i]); } - return updateWithQueryAndScript(dateHint, clazz, builtScripts, conditions); + return updateWithQueryAndScript(dateHint, classes, builtScripts, conditions, waitForComplete); } - private boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final Script[] scripts, final Condition[] conditions) { + private boolean updateWithQueryAndScript(final Date dateHint, final Class<?>[] classes, final Script[] scripts, final Condition[] conditions, boolean waitForComplete) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws Exception { - try { - String itemType = Item.getItemType(clazz); - - String index = getIndexNameForQuery(itemType); + String[] itemTypes = Arrays.stream(classes).map(Item::getItemType).toArray(String[]::new); + String[] indices = Arrays.stream(itemTypes).map(itemType -> getIndexNameForQuery(itemType)).toArray(String[]::new); + try { for (int i = 0; i < scripts.length; i++) { - RefreshRequest refreshRequest = new RefreshRequest(index); + RefreshRequest refreshRequest = new RefreshRequest(indices); client.indices().refresh(refreshRequest, RequestOptions.DEFAULT); - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index); + QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.buildFilter(conditions[i]); + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indices); updateByQueryRequest.setConflicts("proceed"); updateByQueryRequest.setMaxRetries(1000); updateByQueryRequest.setSlices(2); updateByQueryRequest.setScript(scripts[i]); - updateByQueryRequest.setQuery(conditionESQueryBuilderDispatcher.buildFilter(conditions[i])); - - BulkByScrollResponse response = client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); + updateByQueryRequest.setQuery(queryBuilder); - if (response.getBulkFailures().size() > 0) { - for (BulkItemResponse.Failure failure : response.getBulkFailures()) { - logger.error("Failure : cause={} , message={}", failure.getCause(), failure.getMessage()); - } + TaskSubmissionResponse taskResponse = client.submitUpdateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); + if (taskResponse == null) { + logger.error("update with query and script: no response returned for query: {}", queryBuilder); + } else if (waitForComplete) { + waitForTaskComplete(updateByQueryRequest, taskResponse); } else { - logger.info("Update with query and script processed {} entries in {}.", response.getUpdated(), response.getTook().toString()); - } - if (response.isTimedOut()) { - logger.error("Update with query and script ended with timeout!"); - } - if (response.getVersionConflicts() > 0) { - logger.warn("Update with query and script ended with {} version conflicts!", response.getVersionConflicts()); - } - if (response.getNoops() > 0) { - logger.warn("Update Bwith query and script ended with {} noops!", response.getNoops()); + logger.debug("ES task started {}", taskResponse.getTask()); } } return true; } catch (IndexNotFoundException e) { - throw new Exception("No index found for itemType=" + clazz.getName(), e); + throw new Exception("No index found for itemTypes=" + String.join(",", itemTypes), e); } catch (ScriptException e) { logger.error("Error in the update script : {}\n{}\n{}", e.getScript(), e.getDetailedMessage(), e.getScriptStack()); throw new Exception("Error in the update script"); @@ -1055,6 +1050,53 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + private void waitForTaskComplete(AbstractBulkByScrollRequest request, TaskSubmissionResponse response) { + if (logger.isDebugEnabled()) { + logger.debug("Waiting task [{}]: [{}] using query: [{}], polling every {}ms with a timeout configured to {}ms", + response.getTask(), request.toString(), request.getSearchRequest().source().query(), taskWaitingPollingInterval, taskWaitingTimeout); + } + long start = System.currentTimeMillis(); + new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".waitForTask", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { + protected Void execute(Object... args) throws Exception { + + TaskId taskId = new TaskId(response.getTask()); + while (true){ + Optional<GetTaskResponse> getTaskResponseOptional = client.tasks().get(new GetTaskRequest(taskId.getNodeId(), taskId.getId()), RequestOptions.DEFAULT); + if (getTaskResponseOptional.isPresent()) { + GetTaskResponse getTaskResponse = getTaskResponseOptional.get(); + if (getTaskResponse.isCompleted()) { + if (logger.isDebugEnabled()) { + long millis = getTaskResponse.getTaskInfo().getRunningTimeNanos() / 1_000_000; + long seconds = millis / 1000; + + logger.debug("Waiting task [{}]: Finished in {} {}", taskId, + seconds >= 1 ? seconds : millis, + seconds >= 1 ? "seconds" : "milliseconds"); + } + break; + } else { + if ((start + taskWaitingTimeout) < System.currentTimeMillis()) { + logger.error("Waiting task [{}]: Exceeded configured timeout ({}ms), aborting wait process", taskId, taskWaitingTimeout); + break; + } + + try { + Thread.sleep(taskWaitingPollingInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Waiting task [{}]: interrupted"); + } + } + } else { + logger.error("Waiting task [{}]: No task found", taskId); + break; + } + } + return null; + } + }.catchingExecuteInClassLoader(true); + } + @Override public boolean storeScripts(Map<String, String> scripts) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".storeScripts", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { @@ -1161,8 +1203,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); + QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.getQueryBuilder(query); final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType)) - .setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query)) + .setQuery(queryBuilder) // Setting slices to auto will let Elasticsearch choose the number of slices to use. // This setting will use one slice per shard, up to a certain limit. // The delete request will be more efficient and faster than no slicing. @@ -1176,46 +1219,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // So we increase default timeout of 1min to 10min .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes)); - BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + TaskSubmissionResponse taskResponse = client.submitDeleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); - if (bulkByScrollResponse == null) { - logger.error("Remove by query: no response returned for query: {}", query); + if (taskResponse == null) { + logger.error("Remove by query: no response returned for query: {}", queryBuilder); return false; } - if (bulkByScrollResponse.isTimedOut()) { - logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query); - } - - if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) || - bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { - logger.warn("Remove by query: we found some failure during the process of query: {}", query); - - - if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) { - for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) { - logger.warn("Remove by query, search failure: {}", searchFailure.toString()); - } - } - - if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { - for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) { - logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString()); - } - } - } - - if (logger.isDebugEnabled()) { - logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}", - bulkByScrollResponse.getTook().toHumanReadableString(1), - bulkByScrollResponse.getDeleted(), - bulkByScrollResponse.getBatches(), - bulkByScrollResponse.getNoops(), - bulkByScrollResponse.getVersionConflicts(), - bulkByScrollResponse.getSearchRetries(), - bulkByScrollResponse.getBulkRetries(), - query); - } + waitForTaskComplete(deleteByQueryRequest, taskResponse); return true; } catch (Exception e) { @@ -1230,7 +1241,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - public boolean indexTemplateExists(final String templateName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws IOException { diff --git a/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java new file mode 100644 index 000000000..8fff8dea6 --- /dev/null +++ b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.client.tasks.TaskSubmissionResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; + +import java.io.IOException; + +import static java.util.Collections.emptySet; + +/** + * A custom Rest high level client that provide a way of using Task system on updateByQuery and deleteByQuery, + * by returning the response immediately (wait_for_completion set to false) + * see org.elasticsearch.client.RestHighLevelClient for original code. + */ +public class CustomRestHighLevelClient extends RestHighLevelClient { + + public CustomRestHighLevelClient(RestClientBuilder restClientBuilder) { + super(restClientBuilder); + } + + /** + * Executes a delete by query request. + * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html"> + * Delete By Query API on elastic.co</a> + * + * @param deleteByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final TaskSubmissionResponse submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + deleteByQueryRequest, innerDeleteByQueryRequest -> { + Request request = RequestConverters.deleteByQuery(innerDeleteByQueryRequest); + request.addParameter("wait_for_completion", "false"); + return request; + }, options, TaskSubmissionResponse::fromXContent, emptySet() + ); + } + + /** + * Executes a update by query request. + * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html"> + * Update By Query API on elastic.co</a> + * + * @param updateByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final TaskSubmissionResponse submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + updateByQueryRequest, innerUpdateByQueryRequest -> { + Request request = RequestConverters.updateByQuery(updateByQueryRequest); + request.addParameter("wait_for_completion", "false"); + return request; + }, options, TaskSubmissionResponse::fromXContent, emptySet() + ); + } +} diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 9c3800f76..bdf2dcc12 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -54,7 +54,9 @@ <cm:property name="maximalElasticSearchVersion" value="8.0.0" /> <cm:property name="aggregateQueryBucketSize" value="5000" /> - <cm:property name="clientSocketTimeout" value="" /> + <cm:property name="clientSocketTimeout" value="-1" /> + <cm:property name="taskWaitingTimeout" value="3600000" /> + <cm:property name="taskWaitingPollingInterval" value="1000" /> <cm:property name="aggQueryMaxResponseSizeHttp" value="" /> <cm:property name="aggQueryThrowOnMissingDocs" value="false" /> <cm:property name="itemTypeToRefreshPolicy" value="" /> @@ -131,6 +133,8 @@ <property name="itemTypeToRefreshPolicy" value="${es.itemTypeToRefreshPolicy}" /> <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" /> + <property name="taskWaitingTimeout" value="${es.taskWaitingTimeout}" /> + <property name="taskWaitingPollingInterval" value="${es.taskWaitingPollingInterval}" /> <property name="metricsService" ref="metricsService" /> <property name="useBatchingForSave" value="${es.useBatchingForSave}" /> diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index b996c2a1c..5af2232e4 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -57,8 +57,22 @@ maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000 # Disable partitions on aggregation queries for past events. pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false} -# max socket timeout in millis -clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-} +# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the timeout for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets). +# A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default). +# Default: -1 +clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:--1} + +# Defines the waiting for task completion timeout in milliseconds. +# Some operations like update_by_query and delete_by_query are delegated to ElasticSearch using tasks +# For consistency the thread that trigger one of those operations will wait for the task to be completed on ElasticSearch side. +# This timeout configuration is here to ensure not blocking the thread infinitely, in case of very long running tasks. +# A timeout value of zero or negative is interpreted as an infinite timeout. +# Default: 3600000 (1 hour) +taskWaitingTimeout=${org.apache.unomi.elasticsearch.taskWaitingTimeout:-3600000} + +# Defines the polling interval in milliseconds, which is used to check if task is completed on ElasticSearch side +# Default: 1000 (1 second) +taskWaitingPollingInterval=${org.apache.unomi.elasticsearch.taskWaitingPollingInterval:-1000} # refresh policy per item type in Json. # Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is NONE. diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index c4e62f66f..c4cea9943 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -205,6 +205,21 @@ public interface PersistenceService { */ boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions); + /** + * Updates the items of the specified class by a query with a new property value for the specified property name + * based on provided stored scripts and script parameters, + * This one is able to perform an update on multiple types in a single run, be careful with your query as it will be performed on all of them. + * + * @param dateHint a Date helping in identifying where the item is located + * @param classes classes of items to update, be careful all of them will be submitted to update for all scripts/conditions + * @param scripts Stored scripts name + * @param scriptParams script params array + * @param conditions conditions array + * @param waitForComplete if true, wait for the ES execution to be complete + * @return {@code true} if the update was successful, {@code false} otherwise + */ + boolean updateWithQueryAndStoredScript(Date dateHint, Class<?>[] classes, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean waitForComplete); + /** * Store script in the Database for later usage with updateWithQueryAndStoredScript function for example. * diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java index 3d90f6317..bef940e0d 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java @@ -201,8 +201,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { Map<String, Object>[] scriptParams = new Map[]{Collections.singletonMap("profileId", masterProfileId)}; Condition[] conditions = new Condition[]{profileIdsCondition}; - persistenceService.updateWithQueryAndStoredScript(null, Session.class, scripts, scriptParams, conditions); - persistenceService.updateWithQueryAndStoredScript(null, Event.class, scripts, scriptParams, conditions); + persistenceService.updateWithQueryAndStoredScript(null, new Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, false); } else { for (String mergedProfileId : mergedProfileIds) { privacyService.anonymizeBrowsingData(mergedProfileId);