This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new 64169942b UNOMI-784: use tasks to perform update_by_query and 
delete_by_query (#636)
64169942b is described below

commit 64169942b2ca16ee4bd9d1a5cd2f047fd3c194d8
Author: kevan Jahanshahi <jke...@apache.org>
AuthorDate: Thu Aug 3 12:28:34 2023 +0200

    UNOMI-784: use tasks to perform update_by_query and delete_by_query (#636)
    
    * UNOMI-784: set ES client socket timeout to 80sec instead of 30sec by 
default.
    
    * use wait_for_completion for update by script / delete queries
    
    * Fix integration test
    
    * UNOMI-784: simplify the client wrapper
    
    * UNOMI-784: add some clarity to config related to socket timeout
    
    * UNOMI-784: wait for task to complete
    
    * UNOMI-784: improve merge updateByQuery to perform a single task instead 
of two separates
    
    * UNOMI-784: Typo
    
    * UNOMI-784: add configurations regarding tasks waiting timeout and polling 
interval
    
    * UNOMI-784: set task completion log level to debug
    
    * Don't wait for task to be complete for merge profile ES script
    
    * remove non required test
    
    * UNOMI-784: Try speed up the tests and set waitForCompletion to be true in 
service
    
    ---------
    
    Co-authored-by: David Griffon <dgrif...@jahia.com>
---
 itests/pom.xml                                     |   1 +
 .../test/java/org/apache/unomi/itests/BaseIT.java  |  20 +-
 .../org/apache/unomi/itests/ProfileServiceIT.java  |  13 +-
 .../java/org/apache/unomi/itests/SegmentIT.java    |   7 +-
 .../main/resources/etc/custom.system.properties    |  15 +-
 .../ElasticSearchPersistenceServiceImpl.java       | 211 +++++++++++++--------
 .../client/CustomRestHighLevelClient.java          |  76 ++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   6 +-
 .../org.apache.unomi.persistence.elasticsearch.cfg |  18 +-
 .../unomi/persistence/spi/PersistenceService.java  |  14 ++
 .../actions/MergeProfilesOnPropertyAction.java     |   3 +-
 11 files changed, 282 insertions(+), 102 deletions(-)

diff --git a/itests/pom.xml b/itests/pom.xml
index 24e8b65ce..8bd5fd908 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -226,6 +226,7 @@
                             </environmentVariables>
                             <instanceSettings>
                                 <properties>
+                                    <xpack.ml.enabled>false</xpack.ml.enabled>
                                     
<path.repo>${project.build.directory}/snapshots_repository</path.repo>
                                     
<cluster.routing.allocation.disk.threshold_enabled>false</cluster.routing.allocation.disk.threshold_enabled>
                                     
<http.cors.allow-origin>*</http.cors.allow-origin>
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java 
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 7e70e40ab..28cf43c7b 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -93,12 +93,7 @@ import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -256,6 +251,7 @@ public abstract class BaseIT extends KarafTestSupport {
                 editConfigurationFilePut("etc/custom.system.properties", 
"org.apache.unomi.graphql.feature.activated", "true"),
                 editConfigurationFilePut("etc/custom.system.properties", 
"org.apache.unomi.elasticsearch.cluster.name", "contextElasticSearchITests"),
                 editConfigurationFilePut("etc/custom.system.properties", 
"org.apache.unomi.elasticsearch.addresses", "localhost:9400"),
+                editConfigurationFilePut("etc/custom.system.properties", 
"org.apache.unomi.elasticsearch.taskWaitingPollingInterval", "50"),
 
                 
systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"),
                 
systemProperty("org.apache.unomi.hazelcast.group.name").value("cellar"),
@@ -381,13 +377,23 @@ public abstract class BaseIT extends KarafTestSupport {
         persistenceService = getService(PersistenceService.class);
         definitionsService = getService(DefinitionsService.class);
         rulesService = getService(RulesService.class);
+        segmentService = getService(SegmentService.class);
     }
 
     public void updateConfiguration(String serviceName, String configPid, 
String propName, Object propValue)
             throws InterruptedException, IOException {
+        Map<String, Object> props = new HashMap<>();
+        props.put(propName, propValue);
+        updateConfiguration(serviceName, configPid, props);
+    }
+
+    public void updateConfiguration(String serviceName, String configPid, 
Map<String, Object> propsToSet)
+            throws InterruptedException, IOException {
         org.osgi.service.cm.Configuration cfg = 
configurationAdmin.getConfiguration(configPid);
         Dictionary<String, Object> props = cfg.getProperties();
-        props.put(propName, propValue);
+        for (Map.Entry<String, Object> propToSet : propsToSet.entrySet()) {
+            props.put(propToSet.getKey(), propToSet.getValue());
+        }
 
         waitForReRegistration(serviceName, () -> {
             try {
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 306f9d8c8..623904938 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -155,8 +155,17 @@ public class ProfileServiceIT extends BaseIT {
             throws InterruptedException, NoSuchFieldException, 
IllegalAccessException, IOException {
         boolean throwExceptionCurrent = false;
         Configuration elasticSearchConfiguration = 
configurationAdmin.getConfiguration("org.apache.unomi.persistence.elasticsearch");
-        if (elasticSearchConfiguration != null) {
-            throwExceptionCurrent = Boolean.getBoolean((String) 
elasticSearchConfiguration.getProperties().get("throwExceptions"));
+        if (elasticSearchConfiguration != null && 
elasticSearchConfiguration.getProperties().get("throwExceptions") != null) {
+            try {
+                if 
(elasticSearchConfiguration.getProperties().get("throwExceptions") instanceof 
String) {
+                    throwExceptionCurrent = Boolean.parseBoolean((String) 
elasticSearchConfiguration.getProperties().get("throwExceptions"));
+                } else {
+                    // already a boolean
+                    throwExceptionCurrent = (Boolean) 
elasticSearchConfiguration.getProperties().get("throwExceptions");
+                }
+            } catch (Throwable e) {
+                // Not able to cast the property
+            }
         }
 
         updateConfiguration(PersistenceService.class.getName(), 
"org.apache.unomi.persistence.elasticsearch", "throwExceptions", true);
diff --git a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java 
b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
index 12fb99da8..3b3aa9314 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SegmentIT.java
@@ -39,6 +39,7 @@ import org.ops4j.pax.exam.junit.PaxExam;
 import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
 import org.ops4j.pax.exam.spi.reactors.PerSuite;
 import org.ops4j.pax.exam.util.Filter;
+import org.osgi.service.cm.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,11 +47,7 @@ import javax.inject.Inject;
 import java.text.SimpleDateFormat;
 import java.time.LocalDate;
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 @RunWith(PaxExam.class)
 @ExamReactorStrategy(PerSuite.class)
diff --git a/package/src/main/resources/etc/custom.system.properties 
b/package/src/main/resources/etc/custom.system.properties
index eb65ab026..d421c7b43 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -132,7 +132,20 @@ 
org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U
 
org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10}
 
org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000}
 
org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000}
-org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-}
+# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the 
timeout for waiting for data or, put differently, a maximum period inactivity 
between two consecutive data packets).
+# A timeout value of zero is interpreted as an infinite timeout. A negative 
value is interpreted as undefined (system default).
+# Default: -1 (System default)
+org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:--1}
+# Defines the waiting for task completion timeout in milliseconds.
+# Some operations like update_by_query and delete_by_query are delegated to 
ElasticSearch using tasks
+# For consistency the thread that trigger one of those operations will wait 
for the task to be completed on ElasticSearch side.
+# This timeout configuration is here to ensure not blocking the thread 
infinitely, in case of very long running tasks.
+# A timeout value of zero or negative is interpreted as an infinite timeout.
+# Default: 3600000 (one hour)
+org.apache.unomi.elasticsearch.taskWaitingTimeout=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_TIMEOUT:-3600000}
+# Defines the polling interval in milliseconds, which is used to check if task 
is completed on ElasticSearch side
+# Default: 1000 (1 second)
+org.apache.unomi.elasticsearch.taskWaitingPollingInterval=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_POLLING_INTERVAL:-1000}
 
org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false}
 
org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false}
 
org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-}
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 2ad652387..8ca6526f1 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -64,13 +64,16 @@ import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.*;
+import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.core.CountRequest;
 import org.elasticsearch.client.core.CountResponse;
 import org.elasticsearch.client.core.MainResponse;
 import org.elasticsearch.client.indexlifecycle.*;
 import org.elasticsearch.client.indices.*;
+import org.elasticsearch.client.tasks.GetTaskRequest;
+import org.elasticsearch.client.tasks.GetTaskResponse;
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
 import org.elasticsearch.cluster.metadata.AliasMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -78,9 +81,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.*;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.*;
 import org.elasticsearch.index.reindex.*;
@@ -109,6 +110,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.tasks.TaskId;
 import org.osgi.framework.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -144,7 +146,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
     private static final String ROLLOVER_LIFECYCLE_NAME = 
"unomi-rollover-policy";
 
     private boolean throwExceptions = false;
-    private RestHighLevelClient client;
+    private CustomRestHighLevelClient client;
     private BulkProcessor bulkProcessor;
     private String elasticSearchAddresses;
     private List<String> elasticSearchAddressList = new ArrayList<>();
@@ -168,6 +170,8 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
 
     private Integer defaultQueryLimit = 10;
     private Integer removeByQueryTimeoutInMinutes = 10;
+    private Integer taskWaitingTimeout = 3600000;
+    private Integer taskWaitingPollingInterval = 1000;
 
     private String bulkProcessorConcurrentRequests = "1";
     private String bulkProcessorBulkActions = "1000";
@@ -434,6 +438,18 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         this.logLevelRestClient = logLevelRestClient;
     }
 
+    public void setTaskWaitingTimeout(String taskWaitingTimeout) {
+        if (StringUtils.isNumeric(taskWaitingTimeout)) {
+            this.taskWaitingTimeout = Integer.parseInt(taskWaitingTimeout);
+        }
+    }
+
+    public void setTaskWaitingPollingInterval(String 
taskWaitingPollingInterval) {
+        if (StringUtils.isNumeric(taskWaitingPollingInterval)) {
+            this.taskWaitingPollingInterval = 
Integer.parseInt(taskWaitingPollingInterval);
+        }
+    }
+
     public void start() throws Exception {
 
         // Work around to avoid ES Logs regarding the deprecated 
[ignore_throttled] parameter
@@ -505,7 +521,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
         logger.info(this.getClass().getName() + " service started 
successfully.");
     }
 
-    private void buildClient() {
+    private void buildClient() throws NoSuchFieldException, 
IllegalAccessException {
         List<Node> nodeList = new ArrayList<>();
         for (String elasticSearchAddress : elasticSearchAddressList) {
             String[] elasticSearchAddressParts = 
elasticSearchAddress.split(":");
@@ -560,7 +576,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
         });
 
         logger.info("Connecting to ElasticSearch persistence backend using 
cluster name " + clusterName + " and index prefix " + indexPrefix + "...");
-        client = new RestHighLevelClient(clientBuilder);
+        client = new CustomRestHighLevelClient(clientBuilder);
     }
 
     public BulkProcessor getBulkProcessor() {
@@ -1087,64 +1103,59 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         for (int i = 0; i < scripts.length; i++) {
             builtScripts[i] = new Script(ScriptType.INLINE, "painless", 
scripts[i], scriptParams[i]);
         }
-        return updateWithQueryAndScript(clazz, builtScripts, conditions);
+        return updateWithQueryAndScript(new Class<?>[]{clazz}, builtScripts, 
conditions, true);
     }
 
     @Override
     public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> 
clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] 
conditions) {
-        return updateWithQueryAndStoredScript(clazz, scripts, scriptParams, 
conditions);
+        return updateWithQueryAndStoredScript(new Class<?>[]{clazz}, scripts, 
scriptParams, conditions, true);
     }
 
     @Override
     public boolean updateWithQueryAndStoredScript(Class<?> clazz, String[] 
scripts, Map<String, Object>[] scriptParams, Condition[] conditions) {
+        return updateWithQueryAndStoredScript(new Class<?>[]{clazz}, scripts, 
scriptParams, conditions, true);
+    }
+
+    @Override
+    public boolean updateWithQueryAndStoredScript(Class<?>[] classes, String[] 
scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean 
waitForComplete) {
         Script[] builtScripts = new Script[scripts.length];
         for (int i = 0; i < scripts.length; i++) {
             builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i], 
scriptParams[i]);
         }
-        return updateWithQueryAndScript(clazz, builtScripts, conditions);
+        return updateWithQueryAndScript(classes, builtScripts, conditions, 
waitForComplete);
     }
 
-    private boolean updateWithQueryAndScript(final Class<?> clazz, final 
Script[] scripts, final Condition[] conditions) {
+    private boolean updateWithQueryAndScript(final Class<?>[] classes, final 
Script[] scripts, final Condition[] conditions, boolean waitForComplete) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws Exception {
-                try {
-                    String itemType = Item.getItemType(clazz);
-                    String index = getIndex(itemType);
+                String[] itemTypes = 
Arrays.stream(classes).map(Item::getItemType).toArray(String[]::new);
+                String[] indices = Arrays.stream(itemTypes).map(itemType -> 
getIndexNameForQuery(itemType)).toArray(String[]::new);
 
+                try {
                     for (int i = 0; i < scripts.length; i++) {
-                        RefreshRequest refreshRequest = new 
RefreshRequest(index);
+                        RefreshRequest refreshRequest = new 
RefreshRequest(indices);
                         client.indices().refresh(refreshRequest, 
RequestOptions.DEFAULT);
-                        QueryBuilder queryBuilder = 
conditionESQueryBuilderDispatcher.buildFilter(conditions[i]);
 
-                        UpdateByQueryRequest updateByQueryRequest = new 
UpdateByQueryRequest(index);
+                        QueryBuilder queryBuilder = 
conditionESQueryBuilderDispatcher.buildFilter(conditions[i]);
+                        UpdateByQueryRequest updateByQueryRequest = new 
UpdateByQueryRequest(indices);
                         updateByQueryRequest.setConflicts("proceed");
                         updateByQueryRequest.setMaxRetries(1000);
                         updateByQueryRequest.setSlices(2);
                         updateByQueryRequest.setScript(scripts[i]);
-                        
updateByQueryRequest.setQuery(isItemTypeSharingIndex(itemType) ? 
wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder);
-
-                        BulkByScrollResponse response = 
client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+                        
updateByQueryRequest.setQuery(wrapWithItemsTypeQuery(itemTypes, queryBuilder));
 
-                        if (response.getBulkFailures().size() > 0) {
-                            for (BulkItemResponse.Failure failure : 
response.getBulkFailures()) {
-                                logger.error("Failure : cause={} , 
message={}", failure.getCause(), failure.getMessage());
-                            }
+                        TaskSubmissionResponse taskResponse = 
client.submitUpdateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+                        if (taskResponse == null) {
+                            logger.error("update with query and script: no 
response returned for query: {}", queryBuilder);
+                        } else if (waitForComplete) {
+                            waitForTaskComplete(updateByQueryRequest, 
taskResponse);
                         } else {
-                            logger.info("Update with query and script 
processed {} entries in {}.", response.getUpdated(), 
response.getTook().toString());
-                        }
-                        if (response.isTimedOut()) {
-                            logger.error("Update with query and script ended 
with timeout!");
-                        }
-                        if (response.getVersionConflicts() > 0) {
-                            logger.warn("Update with query and script ended 
with {} version conflicts!", response.getVersionConflicts());
-                        }
-                        if (response.getNoops() > 0) {
-                            logger.warn("Update Bwith query and script ended 
with {} noops!", response.getNoops());
+                            logger.debug("ES task started {}", 
taskResponse.getTask());
                         }
                     }
                     return true;
                 } catch (IndexNotFoundException e) {
-                    throw new Exception("No index found for itemType=" + 
clazz.getName(), e);
+                    throw new Exception("No index found for itemTypes=" + 
String.join(",", itemTypes), e);
                 } catch (ScriptException e) {
                     logger.error("Error in the update script : {}\n{}\n{}", 
e.getScript(), e.getDetailedMessage(), e.getScriptStack());
                     throw new Exception("Error in the update script");
@@ -1158,6 +1169,53 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         }
     }
 
+    private void waitForTaskComplete(AbstractBulkByScrollRequest request, 
TaskSubmissionResponse response) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Waiting task [{}]: [{}] using query: [{}], polling 
every {}ms with a timeout configured to {}ms",
+                    response.getTask(), request.toString(), 
request.getSearchRequest().source().query(), taskWaitingPollingInterval, 
taskWaitingTimeout);
+        }
+        long start = System.currentTimeMillis();
+        new InClassLoaderExecute<Void>(metricsService, 
this.getClass().getName() + ".waitForTask", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Void execute(Object... args) throws Exception {
+
+                TaskId taskId = new TaskId(response.getTask());
+                while (true){
+                    Optional<GetTaskResponse> getTaskResponseOptional = 
client.tasks().get(new GetTaskRequest(taskId.getNodeId(), taskId.getId()), 
RequestOptions.DEFAULT);
+                    if (getTaskResponseOptional.isPresent()) {
+                        GetTaskResponse getTaskResponse = 
getTaskResponseOptional.get();
+                        if (getTaskResponse.isCompleted()) {
+                            if (logger.isDebugEnabled()) {
+                                long millis = 
getTaskResponse.getTaskInfo().getRunningTimeNanos() / 1_000_000;
+                                long seconds = millis / 1000;
+
+                                logger.debug("Waiting task [{}]: Finished in 
{} {}", taskId,
+                                        seconds >= 1 ? seconds : millis,
+                                        seconds >= 1 ? "seconds" : 
"milliseconds");
+                            }
+                            break;
+                        } else {
+                            if ((start + taskWaitingTimeout) < 
System.currentTimeMillis()) {
+                                logger.error("Waiting task [{}]: Exceeded 
configured timeout ({}ms), aborting wait process", taskId, taskWaitingTimeout);
+                                break;
+                            }
+
+                            try {
+                                Thread.sleep(taskWaitingPollingInterval);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw new IllegalStateException("Waiting task 
[{}]: interrupted");
+                            }
+                        }
+                    } else {
+                        logger.error("Waiting task [{}]: No task found", 
taskId);
+                        break;
+                    }
+                }
+                return null;
+            }
+        }.catchingExecuteInClassLoader(true);
+    }
+
     @Override
     public boolean storeScripts(Map<String, String> scripts) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".storeScripts", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
@@ -1295,7 +1353,7 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         try {
             String itemType = Item.getItemType(clazz);
             final DeleteByQueryRequest deleteByQueryRequest = new 
DeleteByQueryRequest(getIndexNameForQuery(itemType))
-                    .setQuery(isItemTypeSharingIndex(itemType) ? 
wrapWithItemTypeQuery(itemType, queryBuilder) : queryBuilder)
+                    .setQuery(wrapWithItemTypeQuery(itemType, queryBuilder))
                     // Setting slices to auto will let Elasticsearch choose 
the number of slices to use.
                     // This setting will use one slice per shard, up to a 
certain limit.
                     // The delete request will be more efficient and faster 
than no slicing.
@@ -1309,45 +1367,14 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                     // So we increase default timeout of 1min to 10min
                     
.setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
 
-            BulkByScrollResponse bulkByScrollResponse = 
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+            TaskSubmissionResponse taskResponse = 
client.submitDeleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
 
-            if (bulkByScrollResponse == null) {
+            if (taskResponse == null) {
                 logger.error("Remove by query: no response returned for query: 
{}", queryBuilder);
                 return false;
             }
 
-            if (bulkByScrollResponse.isTimedOut()) {
-                logger.warn("Remove by query: timed out because took more than 
{} minutes for query: {}", removeByQueryTimeoutInMinutes, queryBuilder);
-            }
-
-            if ((bulkByScrollResponse.getSearchFailures() != null && 
bulkByScrollResponse.getSearchFailures().size() > 0) ||
-                    bulkByScrollResponse.getBulkFailures() != null && 
bulkByScrollResponse.getBulkFailures().size() > 0) {
-                logger.warn("Remove by query: we found some failure during the 
process of query: {}", queryBuilder);
-
-                if (bulkByScrollResponse.getSearchFailures() != null && 
bulkByScrollResponse.getSearchFailures().size() > 0) {
-                    for (ScrollableHitSource.SearchFailure searchFailure : 
bulkByScrollResponse.getSearchFailures()) {
-                        logger.warn("Remove by query, search failure: {}", 
searchFailure.toString());
-                    }
-                }
-
-                if (bulkByScrollResponse.getBulkFailures() != null && 
bulkByScrollResponse.getBulkFailures().size() > 0) {
-                    for (BulkItemResponse.Failure bulkFailure : 
bulkByScrollResponse.getBulkFailures()) {
-                        logger.warn("Remove by query, bulk failure: {}", 
bulkFailure.toString());
-                    }
-                }
-            }
-
-            if (logger.isDebugEnabled()) {
-                logger.debug("Remove by query: took {}, deleted docs: {}, 
batches executed: {}, skipped docs: {}, version conflicts: {}, search retries: 
{}, bulk retries: {}, for query: {}",
-                        
bulkByScrollResponse.getTook().toHumanReadableString(1),
-                        bulkByScrollResponse.getDeleted(),
-                        bulkByScrollResponse.getBatches(),
-                        bulkByScrollResponse.getNoops(),
-                        bulkByScrollResponse.getVersionConflicts(),
-                        bulkByScrollResponse.getSearchRetries(),
-                        bulkByScrollResponse.getBulkRetries(),
-                        queryBuilder);
-            }
+            waitForTaskComplete(deleteByQueryRequest, taskResponse);
 
             return true;
         } catch (Exception e) {
@@ -1942,7 +1969,7 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
 
                 CountRequest countRequest = new 
CountRequest(getIndexNameForQuery(itemType));
                 SearchSourceBuilder searchSourceBuilder = new 
SearchSourceBuilder();
-                searchSourceBuilder.query(isItemTypeSharingIndex(itemType) ? 
wrapWithItemTypeQuery(itemType, filter) : filter);
+                searchSourceBuilder.query(wrapWithItemTypeQuery(itemType, 
filter));
                 countRequest.source(searchSourceBuilder);
                 CountResponse response = client.count(countRequest, 
RequestOptions.DEFAULT);
                 return response.getCount();
@@ -1977,7 +2004,7 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                     SearchSourceBuilder searchSourceBuilder = new 
SearchSourceBuilder()
                             .fetchSource(true)
                             .seqNoAndPrimaryTerm(true)
-                            .query(isItemTypeSharingIndex(itemType) ? 
wrapWithItemTypeQuery(itemType, query) : query)
+                            .query(wrapWithItemTypeQuery(itemType, query))
                             .size(size < 0 ? defaultQueryLimit : size)
                             .from(offset);
                     if (scrollTimeValidity != null) {
@@ -2281,15 +2308,12 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                     }
 
                     if (filter != null) {
-                        searchSourceBuilder.query(isItemTypeSharingIndex ?
-                                wrapWithItemTypeQuery(itemType, 
conditionESQueryBuilderDispatcher.buildFilter(filter)) :
-                                
conditionESQueryBuilderDispatcher.buildFilter(filter));
+                        
searchSourceBuilder.query(wrapWithItemTypeQuery(itemType, 
conditionESQueryBuilderDispatcher.buildFilter(filter)));
                     }
                 } else {
                     if (filter != null) {
-                        AggregationBuilder filterAggregation = 
AggregationBuilders.filter("filter", isItemTypeSharingIndex ?
-                                wrapWithItemTypeQuery(itemType, 
conditionESQueryBuilderDispatcher.buildFilter(filter)) :
-                                
conditionESQueryBuilderDispatcher.buildFilter(filter));
+                        AggregationBuilder filterAggregation = 
AggregationBuilders.filter("filter",
+                                wrapWithItemTypeQuery(itemType, 
conditionESQueryBuilderDispatcher.buildFilter(filter)));
                         for (AggregationBuilder aggregationBuilder : 
lastAggregation) {
                             
filterAggregation.subAggregation(aggregationBuilder);
                         }
@@ -2666,10 +2690,33 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
     }
 
     private QueryBuilder wrapWithItemTypeQuery(String itemType, QueryBuilder 
originalQuery) {
-        BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery();
-        wrappedQuery.must(getItemTypeQueryBuilder(itemType));
-        wrappedQuery.must(originalQuery);
-        return wrappedQuery;
+        if (isItemTypeSharingIndex(itemType)) {
+            BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery();
+            wrappedQuery.must(getItemTypeQueryBuilder(itemType));
+            wrappedQuery.must(originalQuery);
+            return wrappedQuery;
+        }
+        return originalQuery;
+    }
+
+    private QueryBuilder wrapWithItemsTypeQuery(String[] itemTypes, 
QueryBuilder originalQuery) {
+        if (itemTypes.length == 1) {
+            return wrapWithItemTypeQuery(itemTypes[0], originalQuery);
+        }
+
+        if (Arrays.stream(itemTypes).anyMatch(this::isItemTypeSharingIndex)) {
+            BoolQueryBuilder itemTypeQuery = QueryBuilders.boolQuery();
+            itemTypeQuery.minimumShouldMatch(1);
+            for (String itemType : itemTypes) {
+                itemTypeQuery.should(getItemTypeQueryBuilder(itemType));
+            }
+
+            BoolQueryBuilder wrappedQuery = QueryBuilders.boolQuery();
+            wrappedQuery.filter(itemTypeQuery);
+            wrappedQuery.must(originalQuery);
+            return wrappedQuery;
+        }
+        return originalQuery;
     }
 
     private QueryBuilder getItemTypeQueryBuilder(String itemType) {
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
 
b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
new file mode 100644
index 000000000..8fff8dea6
--- /dev/null
+++ 
b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+
+import java.io.IOException;
+
+import static java.util.Collections.emptySet;
+
+/**
+ * A custom Rest high level client that provide a way of using Task system on 
updateByQuery and deleteByQuery,
+ * by returning the response immediately (wait_for_completion set to false)
+ * see org.elasticsearch.client.RestHighLevelClient for original code.
+ */
+public class CustomRestHighLevelClient extends RestHighLevelClient {
+
+    public CustomRestHighLevelClient(RestClientBuilder restClientBuilder) {
+        super(restClientBuilder);
+    }
+
+    /**
+     * Executes a delete by query request.
+     * See <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html";>
+     * Delete By Query API on elastic.co</a>
+     *
+     * @param deleteByQueryRequest the request
+     * @param options              the request options (e.g. headers), use 
{@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     */
+    public final TaskSubmissionResponse 
submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions 
options) throws IOException {
+        return performRequestAndParseEntity(
+                deleteByQueryRequest, innerDeleteByQueryRequest -> {
+                    Request request = 
RequestConverters.deleteByQuery(innerDeleteByQueryRequest);
+                    request.addParameter("wait_for_completion", "false");
+                    return request;
+                }, options, TaskSubmissionResponse::fromXContent, emptySet()
+        );
+    }
+
+    /**
+     * Executes a update by query request.
+     * See <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html";>
+     * Update By Query API on elastic.co</a>
+     *
+     * @param updateByQueryRequest the request
+     * @param options              the request options (e.g. headers), use 
{@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     */
+    public final TaskSubmissionResponse 
submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions 
options) throws IOException {
+        return performRequestAndParseEntity(
+                updateByQueryRequest, innerUpdateByQueryRequest -> {
+                    Request request = 
RequestConverters.updateByQuery(updateByQueryRequest);
+                    request.addParameter("wait_for_completion", "false");
+                    return request;
+                }, options, TaskSubmissionResponse::fromXContent, emptySet()
+        );
+    }
+}
diff --git 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 507d8789f..32efdd022 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -63,7 +63,9 @@
             <cm:property name="maximalElasticSearchVersion" value="8.0.0" />
 
             <cm:property name="aggregateQueryBucketSize" value="5000" />
-            <cm:property name="clientSocketTimeout" value="" />
+            <cm:property name="clientSocketTimeout" value="-1" />
+            <cm:property name="taskWaitingTimeout" value="3600000" />
+            <cm:property name="taskWaitingPollingInterval" value="1000" />
             <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
             <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
             <cm:property name="itemTypeToRefreshPolicy" value="" />
@@ -151,6 +153,8 @@
         <property name="itemTypeToRefreshPolicy" 
value="${es.itemTypeToRefreshPolicy}" />
 
         <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" 
/>
+        <property name="taskWaitingTimeout" value="${es.taskWaitingTimeout}" />
+        <property name="taskWaitingPollingInterval" 
value="${es.taskWaitingPollingInterval}" />
 
         <property name="metricsService" ref="metricsService" />
         <property name="useBatchingForSave" value="${es.useBatchingForSave}" />
diff --git 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index 086941e80..224d01110 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -73,8 +73,22 @@ 
maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000
 # Disable partitions on aggregation queries for past events.
 
pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false}
 
-# max socket timeout in millis
-clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
+# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the 
timeout for waiting for data or, put differently, a maximum period inactivity 
between two consecutive data packets).
+# A timeout value of zero is interpreted as an infinite timeout. A negative 
value is interpreted as undefined (system default).
+# Default: -1
+clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:--1}
+
+# Defines the waiting for task completion timeout in milliseconds.
+# Some operations like update_by_query and delete_by_query are delegated to 
ElasticSearch using tasks
+# For consistency the thread that trigger one of those operations will wait 
for the task to be completed on ElasticSearch side.
+# This timeout configuration is here to ensure not blocking the thread 
infinitely, in case of very long running tasks.
+# A timeout value of zero or negative is interpreted as an infinite timeout.
+# Default: 3600000 (1 hour)
+taskWaitingTimeout=${org.apache.unomi.elasticsearch.taskWaitingTimeout:-3600000}
+
+# Defines the polling interval in milliseconds, which is used to check if task 
is completed on ElasticSearch side
+# Default: 1000 (1 second)
+taskWaitingPollingInterval=${org.apache.unomi.elasticsearch.taskWaitingPollingInterval:-1000}
 
 # refresh policy per item type in Json.
 # Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is 
NONE.
diff --git 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 29c196a2b..0fe374616 100644
--- 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -248,6 +248,20 @@ public interface PersistenceService {
         return updateWithQueryAndStoredScript(null, clazz, scripts, 
scriptParams, conditions);
     }
 
+    /**
+     * Updates the items of the specified class by a query with a new property 
value for the specified property name
+     * based on provided stored scripts and script parameters,
+     * This one is able to perform an update on multiple types in a single 
run, be careful with your query as it will be performed on all of them.
+     *
+     * @param classes      classes of items to update, be careful all of them 
will be submitted to update for all scripts/conditions
+     * @param scripts      Stored scripts name
+     * @param scriptParams script params array
+     * @param conditions   conditions array
+     * @param waitForComplete if true, wait for the ES execution to be complete
+     * @return {@code true} if the update was successful, {@code false} 
otherwise
+     */
+    boolean updateWithQueryAndStoredScript(Class<?>[] classes, String[] 
scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean 
waitForComplete);
+
     /**
      * @deprecated use {@link #updateWithQueryAndStoredScript(Class, String[], 
Map[], Condition[])}
      */
diff --git 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 14b6f9f52..59ef60646 100644
--- 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -167,8 +167,7 @@ public class MergeProfilesOnPropertyAction implements 
ActionExecutor {
                     Map<String, Object>[] scriptParams = new 
Map[]{Collections.singletonMap("profileId", masterProfileId)};
                     Condition[] conditions = new 
Condition[]{profileIdsCondition};
 
-                    
persistenceService.updateWithQueryAndStoredScript(Session.class, scripts, 
scriptParams, conditions);
-                    
persistenceService.updateWithQueryAndStoredScript(Event.class, scripts, 
scriptParams, conditions);
+                    persistenceService.updateWithQueryAndStoredScript(new 
Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, false);
                 } else {
                     for (String mergedProfileId : mergedProfileIds) {
                         privacyService.anonymizeBrowsingData(mergedProfileId);


Reply via email to