This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push: new 64169942b UNOMI-784: use tasks to perform update_by_query and delete_by_query (#636) 64169942b is described below commit 64169942b2ca16ee4bd9d1a5cd2f047fd3c194d8 Author: kevan Jahanshahi <jke...@apache.org> AuthorDate: Thu Aug 3 12:28:34 2023 +0200 UNOMI-784: use tasks to perform update_by_query and delete_by_query (#636) * UNOMI-784: set ES client socket timeout to 80sec instead of 30sec by default. * use wait_for_completion for update by script / delete queries * Fix integration test * UNOMI-784: simplify the client wrapper * UNOMI-784: add some clarity to config related to socket timeout * UNOMI-784: wait for task to complete * UNOMI-784: improve merge updateByQuery to perform a single task instead of two separates * UNOMI-784: Typo * UNOMI-784: add configurations regarding tasks waiting timeout and polling interval * UNOMI-784: set task completion log level to debug * Don't wait for task to be complete for merge profile ES script * remove non required test * UNOMI-784: Try speed up the tests and set waitForCompletion to be true in service --------- Co-authored-by: David Griffon <dgrif...@jahia.com> --- itests/pom.xml | 1 + .../test/java/org/apache/unomi/itests/BaseIT.java | 20 +- .../org/apache/unomi/itests/ProfileServiceIT.java | 13 +- .../java/org/apache/unomi/itests/SegmentIT.java | 7 +- .../main/resources/etc/custom.system.properties | 15 +- .../ElasticSearchPersistenceServiceImpl.java | 211 +++++++++++++-------- .../client/CustomRestHighLevelClient.java | 76 ++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 6 +- .../org.apache.unomi.persistence.elasticsearch.cfg | 18 +- .../unomi/persistence/spi/PersistenceService.java | 14 ++ .../actions/MergeProfilesOnPropertyAction.java | 3 +- 11 files changed, 282 insertions(+), 102 deletions(-) diff --git a/itests/pom.xml b/itests/pom.xml index 24e8b65ce..8bd5fd908 100644 --- a/itests/pom.xml +++ b/itests/pom.xml @@ -226,6 +226,7 @@ </environmentVariables> <instanceSettings> <properties> + <xpack.ml.enabled>false</xpack.ml.enabled> <path.repo>${project.build.directory}/snapshots_repository</path.repo> <cluster.routing.allocation.disk.threshold_enabled>false</cluster.routing.allocation.disk.threshold_enabled> <http.cors.allow-origin>*</http.cors.allow-origin> diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java index 7e70e40ab..28cf43c7b 100644 --- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java @@ -93,12 +93,7 @@ import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Dictionary; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.function.Predicate; import java.util.function.Supplier; @@ -256,6 +251,7 @@ public abstract class BaseIT extends KarafTestSupport { editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.graphql.feature.activated", "true"), editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.cluster.name", "contextElasticSearchITests"), editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.addresses", "localhost:9400"), + editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.taskWaitingPollingInterval", "50"), systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"), systemProperty("org.apache.unomi.hazelcast.group.name").value("cellar"), @@ -381,13 +377,23 @@ public abstract class BaseIT extends KarafTestSupport { persistenceService = getService(PersistenceService.class); definitionsService = getService(DefinitionsService.class); rulesService = getService(RulesService.class); + segmentService = getService(SegmentService.class); } public void updateConfiguration(String serviceName, String configPid, String propName, Object propValue) throws InterruptedException, IOException { + Map<String, Object> props = new HashMap<>(); + props.put(propName, propValue); + updateConfiguration(serviceName, configPid, props); + } + + public void updateConfiguration(String serviceName, String configPid, Map<String, Object> propsToSet) + throws InterruptedException, IOException { org.osgi.service.cm.Configuration cfg = configurationAdmin.getConfiguration(configPid); Dictionary<String, Object> props = cfg.getProperties(); - props.put(propName, propValue); + for (Map.Entry<String, Object> propToSet : propsToSet.entrySet()) { + props.put(propToSet.getKey(), propToSet.getValue()); + } waitForReRegistration(serviceName, () -> { try { diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java index 306f9d8c8..623904938 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java @@ -155,8 +155,17 @@ public class ProfileServiceIT extends BaseIT { throws InterruptedException, NoSuchFieldException, IllegalAccessException, IOException { boolean throwExceptionCurrent = false; Configuration elasticSearchConfiguration = configurationAdmin.getConfiguration("org.apache.unomi.persistence.elasticsearch"); - if (elasticSearchConfiguration != null) { - throwExceptionCurrent = Boolean.getBoolean((String) elasticSearchConfiguration.getProperties().get("throwExceptions")); + if (elasticSearchConfiguration != null && elasticSearchConfiguration.getProperties().get("throwExceptions") != null) { + try { + if (elasticSearchConfiguration.getProperties().get("throwExceptions") instanceof String) { + throwExceptionCurrent = Boolean.parseBoolean((String) elasticSearchConfiguration.getProperties().get("throwExceptions")); + } else { + // already a boolean + throwExceptionCurrent = (Boolean) elasticSearchConfiguration.getProperties().get("throwExceptions"); + } + } catch (Throwable e) { + // Not able to cast the property + } } updateConfiguration(PersistenceService.class.getName(), "org.apache.unomi.persistence.elasticsearch", "throwExceptions", true); diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java index 12fb99da8..3b3aa9314 100644 --- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java @@ -39,6 +39,7 @@ import org.ops4j.pax.exam.junit.PaxExam; import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; import org.ops4j.pax.exam.spi.reactors.PerSuite; import org.ops4j.pax.exam.util.Filter; +import org.osgi.service.cm.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +47,7 @@ import javax.inject.Inject; import java.text.SimpleDateFormat; import java.time.LocalDate; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; @RunWith(PaxExam.class) @ExamReactorStrategy(PerSuite.class) diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties index eb65ab026..d421c7b43 100644 --- a/package/src/main/resources/etc/custom.system.properties +++ b/package/src/main/resources/etc/custom.system.properties @@ -132,7 +132,20 @@ org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10} org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000} org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000} -org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-} +# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the timeout for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets). +# A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default). +# Default: -1 (System default) +org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:--1} +# Defines the waiting for task completion timeout in milliseconds. +# Some operations like update_by_query and delete_by_query are delegated to ElasticSearch using tasks +# For consistency the thread that trigger one of those operations will wait for the task to be completed on ElasticSearch side. +# This timeout configuration is here to ensure not blocking the thread infinitely, in case of very long running tasks. +# A timeout value of zero or negative is interpreted as an infinite timeout. +# Default: 3600000 (one hour) +org.apache.unomi.elasticsearch.taskWaitingTimeout=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_TIMEOUT:-3600000} +# Defines the polling interval in milliseconds, which is used to check if task is completed on ElasticSearch side +# Default: 1000 (1 second) +org.apache.unomi.elasticsearch.taskWaitingPollingInterval=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_POLLING_INTERVAL:-1000} org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false} org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false} org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-} diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 2ad652387..8ca6526f1 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -64,13 +64,16 @@ import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.*; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.core.MainResponse; import org.elasticsearch.client.indexlifecycle.*; import org.elasticsearch.client.indices.*; +import org.elasticsearch.client.tasks.GetTaskRequest; +import org.elasticsearch.client.tasks.GetTaskResponse; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.bytes.BytesReference; @@ -78,9 +81,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.*; import org.elasticsearch.index.reindex.*; @@ -109,6 +110,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskId; import org.osgi.framework.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,7 +146,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private static final String ROLLOVER_LIFECYCLE_NAME = "unomi-rollover-policy"; private boolean throwExceptions = false; - private RestHighLevelClient client; + private CustomRestHighLevelClient client; private BulkProcessor bulkProcessor; private String elasticSearchAddresses; private List<String> elasticSearchAddressList = new ArrayList<>(); @@ -168,6 +170,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Integer defaultQueryLimit = 10; private Integer removeByQueryTimeoutInMinutes = 10; + private Integer taskWaitingTimeout = 3600000; + private Integer taskWaitingPollingInterval = 1000; private String bulkProcessorConcurrentRequests = "1"; private String bulkProcessorBulkActions = "1000"; @@ -434,6 +438,18 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.logLevelRestClient = logLevelRestClient; } + public void setTaskWaitingTimeout(String taskWaitingTimeout) { + if (StringUtils.isNumeric(taskWaitingTimeout)) { + this.taskWaitingTimeout = Integer.parseInt(taskWaitingTimeout); + } + } + + public void setTaskWaitingPollingInterval(String taskWaitingPollingInterval) { + if (StringUtils.isNumeric(taskWaitingPollingInterval)) { + this.taskWaitingPollingInterval = Integer.parseInt(taskWaitingPollingInterval); + } + } + public void start() throws Exception { // Work around to avoid ES Logs regarding the deprecated [ignore_throttled] parameter @@ -505,7 +521,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.info(this.getClass().getName() + " service started successfully."); } - private void buildClient() { + private void buildClient() throws NoSuchFieldException, IllegalAccessException { List<Node> nodeList = new ArrayList<>(); for (String elasticSearchAddress : elasticSearchAddressList) { String[] elasticSearchAddressParts = elasticSearchAddress.split(":"); @@ -560,7 +576,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, }); logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index prefix " + indexPrefix + "..."); - client = new RestHighLevelClient(clientBuilder); + client = new CustomRestHighLevelClient(clientBuilder); } public BulkProcessor getBulkProcessor() { @@ -1087,64 +1103,59 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (int i = 0; i < scripts.length; i++) { builtScripts[i] = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]); } - return updateWithQueryAndScript(clazz, builtScripts, conditions); + return updateWithQueryAndScript(new Class<?>[]{clazz}, builtScripts, conditions, true); } @Override public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions) { - return updateWithQueryAndStoredScript(clazz, scripts, scriptParams, conditions); + return updateWithQueryAndStoredScript(new Class<?>[]{clazz}, scripts, scriptParams, conditions, true); } @Override public boolean updateWithQueryAndStoredScript(Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions) { + return updateWithQueryAndStoredScript(new Class<?>[]{clazz}, scripts, scriptParams, conditions, true); + } + + @Override + public boolean updateWithQueryAndStoredScript(Class<?>[] classes, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean waitForComplete) { Script[] builtScripts = new Script[scripts.length]; for (int i = 0; i < scripts.length; i++) { builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i], scriptParams[i]); } - return updateWithQueryAndScript(clazz, builtScripts, conditions); + return updateWithQueryAndScript(classes, builtScripts, conditions, waitForComplete); } - private boolean updateWithQueryAndScript(final Class<?> clazz, final Script[] scripts, final Condition[] conditions) { + private boolean updateWithQueryAndScript(final Class<?>[] classes, final Script[] scripts, final Condition[] conditions, boolean waitForComplete) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { protected Boolean execute(Object... args) throws Exception { - try { - String itemType = Item.getItemType(clazz); - String index = getIndex(itemType); + String[] itemTypes = Arrays.stream(classes).map(Item::getItemType).toArray(String[]::new); + String[] indices = Arrays.stream(itemTypes).map(itemType -> getIndexNameForQuery(itemType)).toArray(String[]::new); + try { for (int i = 0; i < scripts.length; i++) { - RefreshRequest refreshRequest = new RefreshRequest(index); + RefreshRequest refreshRequest = new RefreshRequest(indices); client.indices().refresh(refreshRequest, RequestOptions.DEFAULT); - QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.buildFilter(conditions[i]); - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index); + QueryBuilder queryBuilder = conditionESQueryBuilderDispatcher.buildFilter(conditions[i]); + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indices); updateByQueryRequest.setConflicts("proceed"); updateByQueryRequest.setMaxRetries(1000); updateByQueryRequest.setSlices(2); updateByQueryRequest.setScript(scripts[i]); - updateByQueryRequest.setQuery(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder); - - BulkByScrollResponse response = client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); + updateByQueryRequest.setQuery(wrapWithItemsTypeQuery(itemTypes, queryBuilder)); - if (response.getBulkFailures().size() > 0) { - for (BulkItemResponse.Failure failure : response.getBulkFailures()) { - logger.error("Failure : cause={} , message={}", failure.getCause(), failure.getMessage()); - } + TaskSubmissionResponse taskResponse = client.submitUpdateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); + if (taskResponse == null) { + logger.error("update with query and script: no response returned for query: {}", queryBuilder); + } else if (waitForComplete) { + waitForTaskComplete(updateByQueryRequest, taskResponse); } else { - logger.info("Update with query and script processed {} entries in {}.", response.getUpdated(), response.getTook().toString()); - } - if (response.isTimedOut()) { - logger.error("Update with query and script ended with timeout!"); - } - if (response.getVersionConflicts() > 0) { - logger.warn("Update with query and script ended with {} version conflicts!", response.getVersionConflicts()); - } - if (response.getNoops() > 0) { - logger.warn("Update Bwith query and script ended with {} noops!", response.getNoops()); + logger.debug("ES task started {}", taskResponse.getTask()); } } return true; } catch (IndexNotFoundException e) { - throw new Exception("No index found for itemType=" + clazz.getName(), e); + throw new Exception("No index found for itemTypes=" + String.join(",", itemTypes), e); } catch (ScriptException e) { logger.error("Error in the update script : {}\n{}\n{}", e.getScript(), e.getDetailedMessage(), e.getScriptStack()); throw new Exception("Error in the update script"); @@ -1158,6 +1169,53 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } + private void waitForTaskComplete(AbstractBulkByScrollRequest request, TaskSubmissionResponse response) { + if (logger.isDebugEnabled()) { + logger.debug("Waiting task [{}]: [{}] using query: [{}], polling every {}ms with a timeout configured to {}ms", + response.getTask(), request.toString(), request.getSearchRequest().source().query(), taskWaitingPollingInterval, taskWaitingTimeout); + } + long start = System.currentTimeMillis(); + new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".waitForTask", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { + protected Void execute(Object... args) throws Exception { + + TaskId taskId = new TaskId(response.getTask()); + while (true){ + Optional<GetTaskResponse> getTaskResponseOptional = client.tasks().get(new GetTaskRequest(taskId.getNodeId(), taskId.getId()), RequestOptions.DEFAULT); + if (getTaskResponseOptional.isPresent()) { + GetTaskResponse getTaskResponse = getTaskResponseOptional.get(); + if (getTaskResponse.isCompleted()) { + if (logger.isDebugEnabled()) { + long millis = getTaskResponse.getTaskInfo().getRunningTimeNanos() / 1_000_000; + long seconds = millis / 1000; + + logger.debug("Waiting task [{}]: Finished in {} {}", taskId, + seconds >= 1 ? seconds : millis, + seconds >= 1 ? "seconds" : "milliseconds"); + } + break; + } else { + if ((start + taskWaitingTimeout) < System.currentTimeMillis()) { + logger.error("Waiting task [{}]: Exceeded configured timeout ({}ms), aborting wait process", taskId, taskWaitingTimeout); + break; + } + + try { + Thread.sleep(taskWaitingPollingInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Waiting task [{}]: interrupted"); + } + } + } else { + logger.error("Waiting task [{}]: No task found", taskId); + break; + } + } + return null; + } + }.catchingExecuteInClassLoader(true); + } + @Override public boolean storeScripts(Map<String, String> scripts) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".storeScripts", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) { @@ -1295,7 +1353,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(getIndexNameForQuery(itemType)) - .setQuery(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder) + .setQuery(wrapWithItemTypeQuery(itemType, queryBuilder)) // Setting slices to auto will let Elasticsearch choose the number of slices to use. // This setting will use one slice per shard, up to a certain limit. // The delete request will be more efficient and faster than no slicing. @@ -1309,45 +1367,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // So we increase default timeout of 1min to 10min .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes)); - BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); + TaskSubmissionResponse taskResponse = client.submitDeleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); - if (bulkByScrollResponse == null) { + if (taskResponse == null) { logger.error("Remove by query: no response returned for query: {}", queryBuilder); return false; } - if (bulkByScrollResponse.isTimedOut()) { - logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, queryBuilder); - } - - if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) || - bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { - logger.warn("Remove by query: we found some failure during the process of query: {}", queryBuilder); - - if (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) { - for (ScrollableHitSource.SearchFailure searchFailure : bulkByScrollResponse.getSearchFailures()) { - logger.warn("Remove by query, search failure: {}", searchFailure.toString()); - } - } - - if (bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().size() > 0) { - for (BulkItemResponse.Failure bulkFailure : bulkByScrollResponse.getBulkFailures()) { - logger.warn("Remove by query, bulk failure: {}", bulkFailure.toString()); - } - } - } - - if (logger.isDebugEnabled()) { - logger.debug("Remove by query: took {}, deleted docs: {}, batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: {}, bulk retries: {}, for query: {}", - bulkByScrollResponse.getTook().toHumanReadableString(1), - bulkByScrollResponse.getDeleted(), - bulkByScrollResponse.getBatches(), - bulkByScrollResponse.getNoops(), - bulkByScrollResponse.getVersionConflicts(), - bulkByScrollResponse.getSearchRetries(), - bulkByScrollResponse.getBulkRetries(), - queryBuilder); - } + waitForTaskComplete(deleteByQueryRequest, taskResponse); return true; } catch (Exception e) { @@ -1942,7 +1969,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, CountRequest countRequest = new CountRequest(getIndexNameForQuery(itemType)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, filter) : filter); + searchSourceBuilder.query(wrapWithItemTypeQuery(itemType, filter)); countRequest.source(searchSourceBuilder); CountResponse response = client.count(countRequest, RequestOptions.DEFAULT); return response.getCount(); @@ -1977,7 +2004,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .fetchSource(true) .seqNoAndPrimaryTerm(true) - .query(isItemTypeSharingIndex(itemType) ? wrapWithItemTypeQuery(itemType, query) : query) + .query(wrapWithItemTypeQuery(itemType, query)) .size(size < 0 ? defaultQueryLimit : size) .from(offset); if (scrollTimeValidity != null) { @@ -2281,15 +2308,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } if (filter != null) { - searchSourceBuilder.query(isItemTypeSharingIndex ? - wrapWithItemTypeQuery(itemType, conditionESQueryBuilderDispatcher.buildFilter(filter)) : - conditionESQueryBuilderDispatcher.buildFilter(filter)); + searchSourceBuilder.query(wrapWithItemTypeQuery(itemType, conditionESQueryBuilderDispatcher.buildFilter(filter))); } } else { if (filter != null) { - AggregationBuilder filterAggregation = AggregationBuilders.filter("filter", isItemTypeSharingIndex ? - wrapWithItemTypeQuery(itemType, conditionESQueryBuilderDispatcher.buildFilter(filter)) : - conditionESQueryBuilderDispatcher.buildFilter(filter)); + AggregationBuilder filterAggregation = AggregationBuilders.filter("filter", + wrapWithItemTypeQuery(itemType, conditionESQueryBuilderDispatcher.buildFilter(filter))); for (AggregationBuilder aggregationBuilder : lastAggregation) { filterAggregation.subAggregation(aggregationBuilder); } @@ -2666,10 +2690,33 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } private QueryBuilder wrapWithItemTypeQuery(String itemType, QueryBuilder originalQuery) { - BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery(); - wrappedQuery.must(getItemTypeQueryBuilder(itemType)); - wrappedQuery.must(originalQuery); - return wrappedQuery; + if (isItemTypeSharingIndex(itemType)) { + BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery(); + wrappedQuery.must(getItemTypeQueryBuilder(itemType)); + wrappedQuery.must(originalQuery); + return wrappedQuery; + } + return originalQuery; + } + + private QueryBuilder wrapWithItemsTypeQuery(String[] itemTypes, QueryBuilder originalQuery) { + if (itemTypes.length == 1) { + return wrapWithItemTypeQuery(itemTypes[0], originalQuery); + } + + if (Arrays.stream(itemTypes).anyMatch(this::isItemTypeSharingIndex)) { + BoolQueryBuilder itemTypeQuery = QueryBuilders.boolQuery(); + itemTypeQuery.minimumShouldMatch(1); + for (String itemType : itemTypes) { + itemTypeQuery.should(getItemTypeQueryBuilder(itemType)); + } + + BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery(); + wrappedQuery.filter(itemTypeQuery); + wrappedQuery.must(originalQuery); + return wrappedQuery; + } + return originalQuery; } private QueryBuilder getItemTypeQueryBuilder(String itemType) { diff --git a/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java new file mode 100644 index 000000000..8fff8dea6 --- /dev/null +++ b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.client.tasks.TaskSubmissionResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; + +import java.io.IOException; + +import static java.util.Collections.emptySet; + +/** + * A custom Rest high level client that provide a way of using Task system on updateByQuery and deleteByQuery, + * by returning the response immediately (wait_for_completion set to false) + * see org.elasticsearch.client.RestHighLevelClient for original code. + */ +public class CustomRestHighLevelClient extends RestHighLevelClient { + + public CustomRestHighLevelClient(RestClientBuilder restClientBuilder) { + super(restClientBuilder); + } + + /** + * Executes a delete by query request. + * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html"> + * Delete By Query API on elastic.co</a> + * + * @param deleteByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final TaskSubmissionResponse submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + deleteByQueryRequest, innerDeleteByQueryRequest -> { + Request request = RequestConverters.deleteByQuery(innerDeleteByQueryRequest); + request.addParameter("wait_for_completion", "false"); + return request; + }, options, TaskSubmissionResponse::fromXContent, emptySet() + ); + } + + /** + * Executes a update by query request. + * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html"> + * Update By Query API on elastic.co</a> + * + * @param updateByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final TaskSubmissionResponse submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + updateByQueryRequest, innerUpdateByQueryRequest -> { + Request request = RequestConverters.updateByQuery(updateByQueryRequest); + request.addParameter("wait_for_completion", "false"); + return request; + }, options, TaskSubmissionResponse::fromXContent, emptySet() + ); + } +} diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 507d8789f..32efdd022 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -63,7 +63,9 @@ <cm:property name="maximalElasticSearchVersion" value="8.0.0" /> <cm:property name="aggregateQueryBucketSize" value="5000" /> - <cm:property name="clientSocketTimeout" value="" /> + <cm:property name="clientSocketTimeout" value="-1" /> + <cm:property name="taskWaitingTimeout" value="3600000" /> + <cm:property name="taskWaitingPollingInterval" value="1000" /> <cm:property name="aggQueryMaxResponseSizeHttp" value="" /> <cm:property name="aggQueryThrowOnMissingDocs" value="false" /> <cm:property name="itemTypeToRefreshPolicy" value="" /> @@ -151,6 +153,8 @@ <property name="itemTypeToRefreshPolicy" value="${es.itemTypeToRefreshPolicy}" /> <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" /> + <property name="taskWaitingTimeout" value="${es.taskWaitingTimeout}" /> + <property name="taskWaitingPollingInterval" value="${es.taskWaitingPollingInterval}" /> <property name="metricsService" ref="metricsService" /> <property name="useBatchingForSave" value="${es.useBatchingForSave}" /> diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index 086941e80..224d01110 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -73,8 +73,22 @@ maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000 # Disable partitions on aggregation queries for past events. pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false} -# max socket timeout in millis -clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-} +# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the timeout for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets). +# A timeout value of zero is interpreted as an infinite timeout. A negative value is interpreted as undefined (system default). +# Default: -1 +clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:--1} + +# Defines the waiting for task completion timeout in milliseconds. +# Some operations like update_by_query and delete_by_query are delegated to ElasticSearch using tasks +# For consistency the thread that trigger one of those operations will wait for the task to be completed on ElasticSearch side. +# This timeout configuration is here to ensure not blocking the thread infinitely, in case of very long running tasks. +# A timeout value of zero or negative is interpreted as an infinite timeout. +# Default: 3600000 (1 hour) +taskWaitingTimeout=${org.apache.unomi.elasticsearch.taskWaitingTimeout:-3600000} + +# Defines the polling interval in milliseconds, which is used to check if task is completed on ElasticSearch side +# Default: 1000 (1 second) +taskWaitingPollingInterval=${org.apache.unomi.elasticsearch.taskWaitingPollingInterval:-1000} # refresh policy per item type in Json. # Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is NONE. diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index 29c196a2b..0fe374616 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -248,6 +248,20 @@ public interface PersistenceService { return updateWithQueryAndStoredScript(null, clazz, scripts, scriptParams, conditions); } + /** + * Updates the items of the specified class by a query with a new property value for the specified property name + * based on provided stored scripts and script parameters, + * This one is able to perform an update on multiple types in a single run, be careful with your query as it will be performed on all of them. + * + * @param classes classes of items to update, be careful all of them will be submitted to update for all scripts/conditions + * @param scripts Stored scripts name + * @param scriptParams script params array + * @param conditions conditions array + * @param waitForComplete if true, wait for the ES execution to be complete + * @return {@code true} if the update was successful, {@code false} otherwise + */ + boolean updateWithQueryAndStoredScript(Class<?>[] classes, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean waitForComplete); + /** * @deprecated use {@link #updateWithQueryAndStoredScript(Class, String[], Map[], Condition[])} */ diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java index 14b6f9f52..59ef60646 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java @@ -167,8 +167,7 @@ public class MergeProfilesOnPropertyAction implements ActionExecutor { Map<String, Object>[] scriptParams = new Map[]{Collections.singletonMap("profileId", masterProfileId)}; Condition[] conditions = new Condition[]{profileIdsCondition}; - persistenceService.updateWithQueryAndStoredScript(Session.class, scripts, scriptParams, conditions); - persistenceService.updateWithQueryAndStoredScript(Event.class, scripts, scriptParams, conditions); + persistenceService.updateWithQueryAndStoredScript(new Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, false); } else { for (String mergedProfileId : mergedProfileIds) { privacyService.anonymizeBrowsingData(mergedProfileId);