This is an automated email from the ASF dual-hosted git repository.

dgriffon pushed a commit to branch unomi-1.9.x
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit cda7558926268a70c5d67ef865338fcb5ef8ccb1
Author: kevan Jahanshahi <jke...@apache.org>
AuthorDate: Thu Aug 3 18:05:59 2023 +0200

    UNOMI-784: task backport (#638)
---
 .../test/java/org/apache/unomi/itests/BaseIT.java  |   1 +
 .../org/apache/unomi/itests/ProfileServiceIT.java  |  13 +-
 .../main/resources/etc/custom.system.properties    |  15 +-
 .../ElasticSearchPersistenceServiceImpl.java       | 190 +++++++++++----------
 .../client/CustomRestHighLevelClient.java          |  76 +++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   6 +-
 .../org.apache.unomi.persistence.elasticsearch.cfg |  18 +-
 .../unomi/persistence/spi/PersistenceService.java  |  15 ++
 .../actions/MergeProfilesOnPropertyAction.java     |   3 +-
 9 files changed, 239 insertions(+), 98 deletions(-)

diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java 
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index c8420a324..0ba3ac46f 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -175,6 +175,7 @@ public abstract class BaseIT {
                 editConfigurationFilePut("etc/org.apache.karaf.features.cfg", 
"serviceRequirements", "disable"),
 //                editConfigurationFilePut("etc/org.ops4j.pax.web.cfg", 
"org.osgi.service.http.port", HTTP_PORT),
 //                
systemProperty("org.osgi.service.http.port").value(HTTP_PORT),
+                editConfigurationFilePut("etc/custom.system.properties", 
"org.apache.unomi.elasticsearch.taskWaitingPollingInterval", "50"),
                 
systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"),
                 
systemProperty("org.apache.unomi.itests.elasticsearch.transport.port").value("9500"),
                 
systemProperty("org.apache.unomi.itests.elasticsearch.cluster.name").value("contextElasticSearchITests"),
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index fe82fa749..8264f5883 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -134,8 +134,17 @@ public class ProfileServiceIT extends BaseIT {
     public void testGetProfileWithWrongScrollerIdThrowException() throws 
InterruptedException, NoSuchFieldException, IllegalAccessException, IOException 
{
         boolean throwExceptionCurrent = false;
         Configuration elasticSearchConfiguration = 
configurationAdmin.getConfiguration("org.apache.unomi.persistence.elasticsearch");
-        if (elasticSearchConfiguration != null) {
-            throwExceptionCurrent = Boolean.getBoolean((String) 
elasticSearchConfiguration.getProperties().get("throwExceptions"));
+        if (elasticSearchConfiguration != null && 
elasticSearchConfiguration.getProperties().get("throwExceptions") != null) {
+            try {
+                if 
(elasticSearchConfiguration.getProperties().get("throwExceptions") instanceof 
String) {
+                    throwExceptionCurrent = Boolean.parseBoolean((String) 
elasticSearchConfiguration.getProperties().get("throwExceptions"));
+                } else {
+                    // already a boolean
+                    throwExceptionCurrent = (Boolean) 
elasticSearchConfiguration.getProperties().get("throwExceptions");
+                }
+            } catch (Throwable e) {
+                // Not able to cast the property
+            }
         }
 
         updateConfiguration(PersistenceService.class.getName(), 
"org.apache.unomi.persistence.elasticsearch", "throwExceptions", true);
diff --git a/package/src/main/resources/etc/custom.system.properties 
b/package/src/main/resources/etc/custom.system.properties
index 0628777f2..5c45700e1 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -118,7 +118,20 @@ 
org.apache.unomi.elasticsearch.defaultIndex.indexMaxDocValueFieldsSearch=${env:U
 
org.apache.unomi.elasticsearch.defaultQueryLimit=${env:UNOMI_ELASTICSEARCH_DEFAULTQUERYLIMIT:-10}
 
org.apache.unomi.elasticsearch.aggregateQueryBucketSize=${env:UNOMI_ELASTICSEARCH_AGGREGATEBUCKETSIZE:-5000}
 
org.apache.unomi.elasticsearch.maximumIdsQueryCount=${env:UNOMI_ELASTICSEARCH_MAXIMUMIDSQUERYCOUNT:-5000}
-org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:-}
+# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the 
timeout for waiting for data or, put differently, a maximum period inactivity 
between two consecutive data packets).
+# A timeout value of zero is interpreted as an infinite timeout. A negative 
value is interpreted as undefined (system default).
+# Default: -1 (System default)
+org.apache.unomi.elasticsearch.clientSocketTimeout=${env:UNOMI_ELASTICSEARCH_CLIENT_SOCKET_TIMEOUT:--1}
+# Defines the waiting for task completion timeout in milliseconds.
+# Some operations like update_by_query and delete_by_query are delegated to 
ElasticSearch using tasks
+# For consistency the thread that trigger one of those operations will wait 
for the task to be completed on ElasticSearch side.
+# This timeout configuration is here to ensure not blocking the thread 
infinitely, in case of very long running tasks.
+# A timeout value of zero or negative is interpreted as an infinite timeout.
+# Default: 3600000 (one hour)
+org.apache.unomi.elasticsearch.taskWaitingTimeout=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_TIMEOUT:-3600000}
+# Defines the polling interval in milliseconds, which is used to check if task 
is completed on ElasticSearch side
+# Default: 1000 (1 second)
+org.apache.unomi.elasticsearch.taskWaitingPollingInterval=${env:UNOMI_ELASTICSEARCH_TASK_WAITING_POLLING_INTERVAL:-1000}
 
org.apache.unomi.elasticsearch.pastEventsDisablePartitions=${env:UNOMI_ELASTICSEARCH_PAST_EVENTS_DISABLE_PARTITIONS:-false}
 
org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_THROW_ON_MISSING_DOCS:-false}
 
org.apache.unomi.elasticsearch.aggQueryMaxResponseSizeHttp=${env:UNOMI_ELASTICSEARCH_AGG_QUERY_MAX_RESPONSE_SIZE_HTTP:-}
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 6c0a0fa63..589bca1b1 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -62,27 +62,22 @@ import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.*;
 import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.core.CountRequest;
 import org.elasticsearch.client.core.CountResponse;
 import org.elasticsearch.client.core.MainResponse;
 import org.elasticsearch.client.indices.*;
+import org.elasticsearch.client.tasks.GetTaskRequest;
+import org.elasticsearch.client.tasks.GetTaskResponse;
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.*;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.*;
 import org.elasticsearch.index.reindex.*;
@@ -111,6 +106,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.tasks.TaskId;
 import org.osgi.framework.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,18 +126,7 @@ import java.security.SecureRandom;
 import java.security.cert.X509Certificate;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -165,7 +150,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
 
     private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
     private boolean throwExceptions = false;
-    private RestHighLevelClient client;
+    private CustomRestHighLevelClient client;
     private BulkProcessor bulkProcessor;
     private String elasticSearchAddresses;
     private List<String> elasticSearchAddressList = new ArrayList<>();
@@ -190,6 +175,8 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
 
     private Integer defaultQueryLimit = 10;
     private Integer removeByQueryTimeoutInMinutes = 10;
+    private Integer taskWaitingTimeout = 3600000;
+    private Integer taskWaitingPollingInterval = 1000;
 
     private String itemsMonthlyIndexedOverride = "event,session";
     private String bulkProcessorConcurrentRequests = "1";
@@ -394,6 +381,18 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         this.logLevelRestClient = logLevelRestClient;
     }
 
+    public void setTaskWaitingTimeout(String taskWaitingTimeout) {
+        if (StringUtils.isNumeric(taskWaitingTimeout)) {
+            this.taskWaitingTimeout = Integer.parseInt(taskWaitingTimeout);
+        }
+    }
+
+    public void setTaskWaitingPollingInterval(String 
taskWaitingPollingInterval) {
+        if (StringUtils.isNumeric(taskWaitingPollingInterval)) {
+            this.taskWaitingPollingInterval = 
Integer.parseInt(taskWaitingPollingInterval);
+        }
+    }
+
     public void start() throws Exception {
 
         // Work around to avoid ES Logs regarding the deprecated 
[ignore_throttled] parameter
@@ -471,7 +470,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
         logger.info(this.getClass().getName() + " service started 
successfully.");
     }
 
-    private void buildClient() {
+    private void buildClient() throws NoSuchFieldException, 
IllegalAccessException {
         List<Node> nodeList = new ArrayList<>();
         for (String elasticSearchAddress : elasticSearchAddressList) {
             String[] elasticSearchAddressParts = 
elasticSearchAddress.split(":");
@@ -526,7 +525,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
         });
 
         logger.info("Connecting to ElasticSearch persistence backend using 
cluster name " + clusterName + " and index prefix " + indexPrefix + "...");
-        client = new RestHighLevelClient(clientBuilder);
+        client = new CustomRestHighLevelClient(clientBuilder);
     }
 
     public BulkProcessor getBulkProcessor() {
@@ -989,59 +988,55 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         for (int i = 0; i < scripts.length; i++) {
             builtScripts[i] = new Script(ScriptType.INLINE, "painless", 
scripts[i], scriptParams[i]);
         }
-        return updateWithQueryAndScript(dateHint, clazz, builtScripts, 
conditions);
+
+        return updateWithQueryAndScript(dateHint, new Class<?>[]{clazz}, 
builtScripts, conditions, true);
     }
 
     @Override
     public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> 
clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] 
conditions) {
+        return updateWithQueryAndStoredScript(dateHint, new Class<?>[]{clazz}, 
scripts, scriptParams, conditions, true);
+    }
+
+    @Override
+    public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?>[] 
classes, String[] scripts, Map<String, Object>[] scriptParams, Condition[] 
conditions, boolean waitForComplete) {
         Script[] builtScripts = new Script[scripts.length];
         for (int i = 0; i < scripts.length; i++) {
             builtScripts[i] = new Script(ScriptType.STORED, null, scripts[i], 
scriptParams[i]);
         }
-        return updateWithQueryAndScript(dateHint, clazz, builtScripts, 
conditions);
+        return updateWithQueryAndScript(dateHint, classes, builtScripts, 
conditions, waitForComplete);
     }
 
-    private boolean updateWithQueryAndScript(final Date dateHint, final 
Class<?> clazz, final Script[] scripts, final Condition[] conditions) {
+    private boolean updateWithQueryAndScript(final Date dateHint, final 
Class<?>[] classes, final Script[] scripts, final Condition[] conditions, 
boolean waitForComplete) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws Exception {
-                try {
-                    String itemType = Item.getItemType(clazz);
-
-                    String index = getIndexNameForQuery(itemType);
+                String[] itemTypes = 
Arrays.stream(classes).map(Item::getItemType).toArray(String[]::new);
+                String[] indices = Arrays.stream(itemTypes).map(itemType -> 
getIndexNameForQuery(itemType)).toArray(String[]::new);
 
+                try {
                     for (int i = 0; i < scripts.length; i++) {
-                        RefreshRequest refreshRequest = new 
RefreshRequest(index);
+                        RefreshRequest refreshRequest = new 
RefreshRequest(indices);
                         client.indices().refresh(refreshRequest, 
RequestOptions.DEFAULT);
 
-                        UpdateByQueryRequest updateByQueryRequest = new 
UpdateByQueryRequest(index);
+                        QueryBuilder queryBuilder = 
conditionESQueryBuilderDispatcher.buildFilter(conditions[i]);
+                        UpdateByQueryRequest updateByQueryRequest = new 
UpdateByQueryRequest(indices);
                         updateByQueryRequest.setConflicts("proceed");
                         updateByQueryRequest.setMaxRetries(1000);
                         updateByQueryRequest.setSlices(2);
                         updateByQueryRequest.setScript(scripts[i]);
-                        
updateByQueryRequest.setQuery(conditionESQueryBuilderDispatcher.buildFilter(conditions[i]));
-
-                        BulkByScrollResponse response = 
client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+                        updateByQueryRequest.setQuery(queryBuilder);
 
-                        if (response.getBulkFailures().size() > 0) {
-                            for (BulkItemResponse.Failure failure : 
response.getBulkFailures()) {
-                                logger.error("Failure : cause={} , 
message={}", failure.getCause(), failure.getMessage());
-                            }
+                        TaskSubmissionResponse taskResponse = 
client.submitUpdateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+                        if (taskResponse == null) {
+                            logger.error("update with query and script: no 
response returned for query: {}", queryBuilder);
+                        } else if (waitForComplete) {
+                            waitForTaskComplete(updateByQueryRequest, 
taskResponse);
                         } else {
-                            logger.info("Update with query and script 
processed {} entries in {}.", response.getUpdated(), 
response.getTook().toString());
-                        }
-                        if (response.isTimedOut()) {
-                            logger.error("Update with query and script ended 
with timeout!");
-                        }
-                        if (response.getVersionConflicts() > 0) {
-                            logger.warn("Update with query and script ended 
with {} version conflicts!", response.getVersionConflicts());
-                        }
-                        if (response.getNoops() > 0) {
-                            logger.warn("Update Bwith query and script ended 
with {} noops!", response.getNoops());
+                            logger.debug("ES task started {}", 
taskResponse.getTask());
                         }
                     }
                     return true;
                 } catch (IndexNotFoundException e) {
-                    throw new Exception("No index found for itemType=" + 
clazz.getName(), e);
+                    throw new Exception("No index found for itemTypes=" + 
String.join(",", itemTypes), e);
                 } catch (ScriptException e) {
                     logger.error("Error in the update script : {}\n{}\n{}", 
e.getScript(), e.getDetailedMessage(), e.getScriptStack());
                     throw new Exception("Error in the update script");
@@ -1055,6 +1050,53 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         }
     }
 
+    private void waitForTaskComplete(AbstractBulkByScrollRequest request, 
TaskSubmissionResponse response) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Waiting task [{}]: [{}] using query: [{}], polling 
every {}ms with a timeout configured to {}ms",
+                    response.getTask(), request.toString(), 
request.getSearchRequest().source().query(), taskWaitingPollingInterval, 
taskWaitingTimeout);
+        }
+        long start = System.currentTimeMillis();
+        new InClassLoaderExecute<Void>(metricsService, 
this.getClass().getName() + ".waitForTask", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Void execute(Object... args) throws Exception {
+
+                TaskId taskId = new TaskId(response.getTask());
+                while (true){
+                    Optional<GetTaskResponse> getTaskResponseOptional = 
client.tasks().get(new GetTaskRequest(taskId.getNodeId(), taskId.getId()), 
RequestOptions.DEFAULT);
+                    if (getTaskResponseOptional.isPresent()) {
+                        GetTaskResponse getTaskResponse = 
getTaskResponseOptional.get();
+                        if (getTaskResponse.isCompleted()) {
+                            if (logger.isDebugEnabled()) {
+                                long millis = 
getTaskResponse.getTaskInfo().getRunningTimeNanos() / 1_000_000;
+                                long seconds = millis / 1000;
+
+                                logger.debug("Waiting task [{}]: Finished in 
{} {}", taskId,
+                                        seconds >= 1 ? seconds : millis,
+                                        seconds >= 1 ? "seconds" : 
"milliseconds");
+                            }
+                            break;
+                        } else {
+                            if ((start + taskWaitingTimeout) < 
System.currentTimeMillis()) {
+                                logger.error("Waiting task [{}]: Exceeded 
configured timeout ({}ms), aborting wait process", taskId, taskWaitingTimeout);
+                                break;
+                            }
+
+                            try {
+                                Thread.sleep(taskWaitingPollingInterval);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw new IllegalStateException("Waiting task 
[{}]: interrupted");
+                            }
+                        }
+                    } else {
+                        logger.error("Waiting task [{}]: No task found", 
taskId);
+                        break;
+                    }
+                }
+                return null;
+            }
+        }.catchingExecuteInClassLoader(true);
+    }
+
     @Override
     public boolean storeScripts(Map<String, String> scripts) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".storeScripts", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
@@ -1161,8 +1203,9 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
             protected Boolean execute(Object... args) throws Exception {
                 try {
                     String itemType = Item.getItemType(clazz);
+                    QueryBuilder queryBuilder = 
conditionESQueryBuilderDispatcher.getQueryBuilder(query);
                     final DeleteByQueryRequest deleteByQueryRequest = new 
DeleteByQueryRequest(getIndexNameForQuery(itemType))
-                            
.setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
+                            .setQuery(queryBuilder)
                             // Setting slices to auto will let Elasticsearch 
choose the number of slices to use.
                             // This setting will use one slice per shard, up 
to a certain limit.
                             // The delete request will be more efficient and 
faster than no slicing.
@@ -1176,46 +1219,14 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                             // So we increase default timeout of 1min to 10min
                             
.setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
 
-                    BulkByScrollResponse bulkByScrollResponse = 
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
+                    TaskSubmissionResponse taskResponse = 
client.submitDeleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
 
-                    if (bulkByScrollResponse == null) {
-                        logger.error("Remove by query: no response returned 
for query: {}", query);
+                    if (taskResponse == null) {
+                        logger.error("Remove by query: no response returned 
for query: {}", queryBuilder);
                         return false;
                     }
 
-                    if (bulkByScrollResponse.isTimedOut()) {
-                        logger.warn("Remove by query: timed out because took 
more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
-                    }
-
-                    if ((bulkByScrollResponse.getSearchFailures() != null && 
bulkByScrollResponse.getSearchFailures().size() > 0) ||
-                            bulkByScrollResponse.getBulkFailures() != null && 
bulkByScrollResponse.getBulkFailures().size() > 0) {
-                        logger.warn("Remove by query: we found some failure 
during the process of query: {}", query);
-
-
-                        if (bulkByScrollResponse.getSearchFailures() != null 
&& bulkByScrollResponse.getSearchFailures().size() > 0) {
-                            for (ScrollableHitSource.SearchFailure 
searchFailure : bulkByScrollResponse.getSearchFailures()) {
-                                logger.warn("Remove by query, search failure: 
{}", searchFailure.toString());
-                            }
-                        }
-
-                        if (bulkByScrollResponse.getBulkFailures() != null && 
bulkByScrollResponse.getBulkFailures().size() > 0) {
-                            for (BulkItemResponse.Failure bulkFailure : 
bulkByScrollResponse.getBulkFailures()) {
-                                logger.warn("Remove by query, bulk failure: 
{}", bulkFailure.toString());
-                            }
-                        }
-                    }
-
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Remove by query: took {}, deleted docs: 
{}, batches executed: {}, skipped docs: {}, version conflicts: {}, search 
retries: {}, bulk retries: {}, for query: {}",
-                                
bulkByScrollResponse.getTook().toHumanReadableString(1),
-                                bulkByScrollResponse.getDeleted(),
-                                bulkByScrollResponse.getBatches(),
-                                bulkByScrollResponse.getNoops(),
-                                bulkByScrollResponse.getVersionConflicts(),
-                                bulkByScrollResponse.getSearchRetries(),
-                                bulkByScrollResponse.getBulkRetries(),
-                                query);
-                    }
+                    waitForTaskComplete(deleteByQueryRequest, taskResponse);
 
                     return true;
                 } catch (Exception e) {
@@ -1230,7 +1241,6 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         }
     }
 
-
     public boolean indexTemplateExists(final String templateName) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".indexTemplateExists", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
             protected Boolean execute(Object... args) throws IOException {
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
 
b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
new file mode 100644
index 000000000..8fff8dea6
--- /dev/null
+++ 
b/persistence-elasticsearch/core/src/main/java/org/elasticsearch/client/CustomRestHighLevelClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+
+import java.io.IOException;
+
+import static java.util.Collections.emptySet;
+
+/**
+ * A custom Rest high level client that provide a way of using Task system on 
updateByQuery and deleteByQuery,
+ * by returning the response immediately (wait_for_completion set to false)
+ * see org.elasticsearch.client.RestHighLevelClient for original code.
+ */
+public class CustomRestHighLevelClient extends RestHighLevelClient {
+
+    public CustomRestHighLevelClient(RestClientBuilder restClientBuilder) {
+        super(restClientBuilder);
+    }
+
+    /**
+     * Executes a delete by query request.
+     * See <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html";>
+     * Delete By Query API on elastic.co</a>
+     *
+     * @param deleteByQueryRequest the request
+     * @param options              the request options (e.g. headers), use 
{@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     */
+    public final TaskSubmissionResponse 
submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions 
options) throws IOException {
+        return performRequestAndParseEntity(
+                deleteByQueryRequest, innerDeleteByQueryRequest -> {
+                    Request request = 
RequestConverters.deleteByQuery(innerDeleteByQueryRequest);
+                    request.addParameter("wait_for_completion", "false");
+                    return request;
+                }, options, TaskSubmissionResponse::fromXContent, emptySet()
+        );
+    }
+
+    /**
+     * Executes a update by query request.
+     * See <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html";>
+     * Update By Query API on elastic.co</a>
+     *
+     * @param updateByQueryRequest the request
+     * @param options              the request options (e.g. headers), use 
{@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     */
+    public final TaskSubmissionResponse 
submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions 
options) throws IOException {
+        return performRequestAndParseEntity(
+                updateByQueryRequest, innerUpdateByQueryRequest -> {
+                    Request request = 
RequestConverters.updateByQuery(updateByQueryRequest);
+                    request.addParameter("wait_for_completion", "false");
+                    return request;
+                }, options, TaskSubmissionResponse::fromXContent, emptySet()
+        );
+    }
+}
diff --git 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 9c3800f76..bdf2dcc12 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -54,7 +54,9 @@
             <cm:property name="maximalElasticSearchVersion" value="8.0.0" />
 
             <cm:property name="aggregateQueryBucketSize" value="5000" />
-            <cm:property name="clientSocketTimeout" value="" />
+            <cm:property name="clientSocketTimeout" value="-1" />
+            <cm:property name="taskWaitingTimeout" value="3600000" />
+            <cm:property name="taskWaitingPollingInterval" value="1000" />
             <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
             <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
             <cm:property name="itemTypeToRefreshPolicy" value="" />
@@ -131,6 +133,8 @@
         <property name="itemTypeToRefreshPolicy" 
value="${es.itemTypeToRefreshPolicy}" />
 
         <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" 
/>
+        <property name="taskWaitingTimeout" value="${es.taskWaitingTimeout}" />
+        <property name="taskWaitingPollingInterval" 
value="${es.taskWaitingPollingInterval}" />
 
         <property name="metricsService" ref="metricsService" />
         <property name="useBatchingForSave" value="${es.useBatchingForSave}" />
diff --git 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index b996c2a1c..5af2232e4 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -57,8 +57,22 @@ 
maximumIdsQueryCount=${org.apache.unomi.elasticsearch.maximumIdsQueryCount:-5000
 # Disable partitions on aggregation queries for past events.
 
pastEventsDisablePartitions=${org.apache.unomi.elasticsearch.pastEventsDisablePartitions:-false}
 
-# max socket timeout in millis
-clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
+# Defines the socket timeout (SO_TIMEOUT) in milliseconds, which is the 
timeout for waiting for data or, put differently, a maximum period inactivity 
between two consecutive data packets).
+# A timeout value of zero is interpreted as an infinite timeout. A negative 
value is interpreted as undefined (system default).
+# Default: -1
+clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:--1}
+
+# Defines the waiting for task completion timeout in milliseconds.
+# Some operations like update_by_query and delete_by_query are delegated to 
ElasticSearch using tasks
+# For consistency the thread that trigger one of those operations will wait 
for the task to be completed on ElasticSearch side.
+# This timeout configuration is here to ensure not blocking the thread 
infinitely, in case of very long running tasks.
+# A timeout value of zero or negative is interpreted as an infinite timeout.
+# Default: 3600000 (1 hour)
+taskWaitingTimeout=${org.apache.unomi.elasticsearch.taskWaitingTimeout:-3600000}
+
+# Defines the polling interval in milliseconds, which is used to check if task 
is completed on ElasticSearch side
+# Default: 1000 (1 second)
+taskWaitingPollingInterval=${org.apache.unomi.elasticsearch.taskWaitingPollingInterval:-1000}
 
 # refresh policy per item type in Json.
 # Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is 
NONE.
diff --git 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index c4e62f66f..c4cea9943 100644
--- 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -205,6 +205,21 @@ public interface PersistenceService {
      */
     boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> clazz, 
String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions);
 
+    /**
+     * Updates the items of the specified class by a query with a new property 
value for the specified property name
+     * based on provided stored scripts and script parameters,
+     * This one is able to perform an update on multiple types in a single 
run, be careful with your query as it will be performed on all of them.
+     *
+     * @param dateHint      a Date helping in identifying where the item is 
located
+     * @param classes      classes of items to update, be careful all of them 
will be submitted to update for all scripts/conditions
+     * @param scripts      Stored scripts name
+     * @param scriptParams script params array
+     * @param conditions   conditions array
+     * @param waitForComplete if true, wait for the ES execution to be complete
+     * @return {@code true} if the update was successful, {@code false} 
otherwise
+     */
+    boolean updateWithQueryAndStoredScript(Date dateHint, Class<?>[] classes, 
String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions, 
boolean waitForComplete);
+
     /**
      * Store script in the Database for later usage with 
updateWithQueryAndStoredScript function for example.
      *
diff --git 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index 3d90f6317..bef940e0d 100644
--- 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -201,8 +201,7 @@ public class MergeProfilesOnPropertyAction implements 
ActionExecutor {
                     Map<String, Object>[] scriptParams = new 
Map[]{Collections.singletonMap("profileId", masterProfileId)};
                     Condition[] conditions = new 
Condition[]{profileIdsCondition};
 
-                    persistenceService.updateWithQueryAndStoredScript(null, 
Session.class, scripts, scriptParams, conditions);
-                    persistenceService.updateWithQueryAndStoredScript(null, 
Event.class, scripts, scriptParams, conditions);
+                    persistenceService.updateWithQueryAndStoredScript(null, 
new Class[]{Session.class, Event.class}, scripts, scriptParams, conditions, 
false);
                 } else {
                     for (String mergedProfileId : mergedProfileIds) {
                         privacyService.anonymizeBrowsingData(mergedProfileId);


Reply via email to