sergehuber commented on code in PR #715:
URL: https://github.com/apache/unomi/pull/715#discussion_r1940893932


##########
persistence-opensearch/core/src/main/java/org/apache/unomi/persistence/opensearch/OpenSearchPersistenceServiceImpl.java:
##########
@@ -0,0 +1,2761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.persistence.opensearch;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.json.stream.JsonParser;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.util.EntityUtils;
+import org.apache.unomi.api.*;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.query.DateRange;
+import org.apache.unomi.api.query.IpRange;
+import org.apache.unomi.api.query.NumericRange;
+import org.apache.unomi.metrics.MetricAdapter;
+import org.apache.unomi.metrics.MetricsService;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.apache.unomi.persistence.spi.aggregate.DateRangeAggregate;
+import org.apache.unomi.persistence.spi.aggregate.IpRangeAggregate;
+import org.apache.unomi.persistence.spi.aggregate.*;
+import org.apache.unomi.persistence.spi.conditions.ConditionContextHelper;
+import org.apache.unomi.persistence.spi.conditions.ConditionEvaluator;
+import 
org.apache.unomi.persistence.spi.conditions.ConditionEvaluatorDispatcher;
+import org.opensearch.client.*;
+import org.opensearch.client.json.JsonData;
+import org.opensearch.client.json.JsonpMapper;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
+import org.opensearch.client.opensearch.OpenSearchClient;
+import org.opensearch.client.opensearch._types.*;
+import org.opensearch.client.opensearch._types.aggregations.*;
+import org.opensearch.client.opensearch._types.mapping.TypeMapping;
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch.cluster.HealthRequest;
+import org.opensearch.client.opensearch.cluster.HealthResponse;
+import org.opensearch.client.opensearch.core.*;
+import org.opensearch.client.opensearch.core.bulk.BulkOperation;
+import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
+import org.opensearch.client.opensearch.core.search.Hit;
+import org.opensearch.client.opensearch.core.search.HitsMetadata;
+import org.opensearch.client.opensearch.core.search.TotalHits;
+import org.opensearch.client.opensearch.core.search.TotalHitsRelation;
+import org.opensearch.client.opensearch.indices.*;
+import org.opensearch.client.opensearch.indices.get_alias.IndexAliases;
+import org.opensearch.client.opensearch.tasks.GetTasksResponse;
+import org.opensearch.client.transport.OpenSearchTransport;
+import org.opensearch.client.transport.rest_client.RestClientTransport;
+import org.osgi.framework.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.*;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.util.*;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("rawtypes")
+public class OpenSearchPersistenceServiceImpl implements PersistenceService, 
SynchronousBundleListener {
+
+    public static final String SEQ_NO = "seq_no";
+    public static final String PRIMARY_TERM = "primary_term";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OpenSearchPersistenceServiceImpl.class.getName());
+    private static final String ROLLOVER_LIFECYCLE_NAME = 
"unomi-rollover-policy";
+
+    private boolean throwExceptions = false;
+
+    private OpenSearchClient client;
+    private RestClient restClient;
+
+    private final List<String> openSearchAddressList = new ArrayList<>();
+    private String clusterName;
+    private String indexPrefix;
+    private String monthlyIndexNumberOfShards;
+    private String monthlyIndexNumberOfReplicas;
+    private String monthlyIndexMappingTotalFieldsLimit;
+    private String monthlyIndexMaxDocValueFieldsSearch;
+    private String numberOfShards;
+    private String numberOfReplicas;
+    private String indexMappingTotalFieldsLimit;
+    private String indexMaxDocValueFieldsSearch;
+    private String[] fatalIllegalStateErrors;
+    private BundleContext bundleContext;
+    private final Map<String, String> mappings = new HashMap<>();
+    private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
+    private ConditionOSQueryBuilderDispatcher 
conditionOSQueryBuilderDispatcher;
+    private List<String> itemsMonthlyIndexed;
+    private Map<String, String> routingByType;
+
+    private Integer defaultQueryLimit = 10;
+    private Integer removeByQueryTimeoutInMinutes = 10;
+    private Integer taskWaitingTimeout = 3600000;
+    private Integer taskWaitingPollingInterval = 1000;
+
+    // Rollover configuration
+    private String sessionLatestIndex;
+    private List<String> rolloverIndices;
+    private String rolloverMaxSize;
+    private String rolloverMaxAge;
+    private String rolloverMaxDocs;
+    private String rolloverIndexNumberOfShards;
+    private String rolloverIndexNumberOfReplicas;
+    private String rolloverIndexMappingTotalFieldsLimit;
+    private String rolloverIndexMaxDocValueFieldsSearch;
+
+    private String minimalOpenSearchVersion = "2.1.0";
+    private String maximalOpenSearchVersion = "3.0.0";
+
+    // authentication props
+    private String username;
+    private String password;
+    private boolean sslEnable = false;
+    private boolean sslTrustAllCertificates = false;
+
+    private int aggregateQueryBucketSize = 5000;
+
+    private MetricsService metricsService;
+    private boolean useBatchingForSave = false;
+    private boolean useBatchingForUpdate = true;
+    private String logLevelRestClient = "ERROR";
+    private boolean alwaysOverwrite = true;
+    private boolean aggQueryThrowOnMissingDocs = false;
+    private Integer aggQueryMaxResponseSizeHttp = null;
+    private Integer clientSocketTimeout = null;
+    private Map<String, Refresh> itemTypeToRefreshPolicy = new HashMap<>();
+
+    private final Map<String, Map<String, Map<String, Object>>> knownMappings 
= new HashMap<>();
+
+    private static final Map<String, String> itemTypeIndexNameMap = new 
HashMap<>();
+    private static final Collection<String> systemItems = 
Arrays.asList("actionType", "campaign", "campaignevent", "goal",
+            "userList", "propertyType", "scope", "conditionType", "rule", 
"scoring", "segment", "groovyAction", "topic",
+            "patch", "jsonSchema", "importConfig", "exportConfig", 
"rulestats");
+    static {
+        for (String systemItem : systemItems) {
+            itemTypeIndexNameMap.put(systemItem, "systemItems");
+        }
+
+        itemTypeIndexNameMap.put("profile", "profile");
+        itemTypeIndexNameMap.put("persona", "profile");
+    }
+
+    private final JsonpMapper jsonpMapper = new JacksonJsonpMapper();
+
+    private String minimalClusterState = "GREEN"; // Add this as a class field
+    private int clusterHealthTimeout = 30; // timeout in seconds
+    private int clusterHealthRetries = 3;
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public void setOpenSearchAddresses(String openSearchAddresses) {
+        String[] openSearchAddressesArray = openSearchAddresses.split(",");
+        openSearchAddressList.clear();
+        for (String openSearchAddress : openSearchAddressesArray) {
+            openSearchAddressList.add(openSearchAddress.trim());
+        }
+    }
+
+    public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy) 
throws IOException {
+        if (!itemTypeToRefreshPolicy.isEmpty()) {
+            this.itemTypeToRefreshPolicy = new 
ObjectMapper().readValue(itemTypeToRefreshPolicy,
+                    new TypeReference<HashMap<String, Refresh>>() {
+                    });
+        }
+    }
+
+    public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
+        this.fatalIllegalStateErrors = 
Arrays.stream(fatalIllegalStateErrors.split(","))
+                .map(String::trim).filter(i -> 
!i.isEmpty()).toArray(String[]::new);
+    }
+
+    public void setAggQueryMaxResponseSizeHttp(String 
aggQueryMaxResponseSizeHttp) {
+        if (StringUtils.isNumeric(aggQueryMaxResponseSizeHttp)) {
+            this.aggQueryMaxResponseSizeHttp = 
Integer.parseInt(aggQueryMaxResponseSizeHttp);
+        }
+    }
+
+    public void setIndexPrefix(String indexPrefix) {
+        this.indexPrefix = indexPrefix;
+    }
+
+    @Deprecated
+    public void setMonthlyIndexNumberOfShards(String 
monthlyIndexNumberOfShards) {
+        this.monthlyIndexNumberOfShards = monthlyIndexNumberOfShards;
+    }
+
+    @Deprecated
+    public void setMonthlyIndexNumberOfReplicas(String 
monthlyIndexNumberOfReplicas) {
+        this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas;
+    }
+
+    @Deprecated
+    public void setMonthlyIndexMappingTotalFieldsLimit(String 
monthlyIndexMappingTotalFieldsLimit) {
+        this.monthlyIndexMappingTotalFieldsLimit = 
monthlyIndexMappingTotalFieldsLimit;
+    }
+
+    @Deprecated
+    public void setMonthlyIndexMaxDocValueFieldsSearch(String 
monthlyIndexMaxDocValueFieldsSearch) {
+        this.monthlyIndexMaxDocValueFieldsSearch = 
monthlyIndexMaxDocValueFieldsSearch;
+    }
+
+    @Deprecated
+    public void setItemsMonthlyIndexedOverride(String 
itemsMonthlyIndexedOverride) {
+        this.itemsMonthlyIndexed = 
StringUtils.isNotEmpty(itemsMonthlyIndexedOverride) ? 
Arrays.asList(itemsMonthlyIndexedOverride.split(",").clone()) : 
Collections.emptyList();
+    }
+
+    public void setNumberOfShards(String numberOfShards) {
+        this.numberOfShards = numberOfShards;
+    }
+
+    public void setNumberOfReplicas(String numberOfReplicas) {
+        this.numberOfReplicas = numberOfReplicas;
+    }
+
+    public void setIndexMappingTotalFieldsLimit(String 
indexMappingTotalFieldsLimit) {
+        this.indexMappingTotalFieldsLimit = indexMappingTotalFieldsLimit;
+    }
+
+    public void setIndexMaxDocValueFieldsSearch(String 
indexMaxDocValueFieldsSearch) {
+        this.indexMaxDocValueFieldsSearch = indexMaxDocValueFieldsSearch;
+    }
+
+    public void setDefaultQueryLimit(Integer defaultQueryLimit) {
+        this.defaultQueryLimit = defaultQueryLimit;
+    }
+
+    public void setRoutingByType(Map<String, String> routingByType) {
+        this.routingByType = routingByType;
+    }
+
+    public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher 
conditionEvaluatorDispatcher) {
+        this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
+    }
+
+    public void 
setConditionOSQueryBuilderDispatcher(ConditionOSQueryBuilderDispatcher 
conditionOSQueryBuilderDispatcher) {
+        this.conditionOSQueryBuilderDispatcher = 
conditionOSQueryBuilderDispatcher;
+    }
+
+    public void setRolloverIndices(String rolloverIndices) {
+        this.rolloverIndices = StringUtils.isNotEmpty(rolloverIndices) ? 
Arrays.asList(rolloverIndices.split(",").clone()) : null;
+    }
+
+    public void setRolloverMaxSize(String rolloverMaxSize) {
+        this.rolloverMaxSize = rolloverMaxSize;
+    }
+
+    public void setRolloverMaxAge(String rolloverMaxAge) {
+        this.rolloverMaxAge = rolloverMaxAge;
+    }
+
+    public void setRolloverMaxDocs(String rolloverMaxDocs) {
+        this.rolloverMaxDocs = rolloverMaxDocs;
+    }
+
+    public void setRolloverIndexNumberOfShards(String 
rolloverIndexNumberOfShards) {
+        this.rolloverIndexNumberOfShards = rolloverIndexNumberOfShards;
+    }
+
+    public void setRolloverIndexNumberOfReplicas(String 
rolloverIndexNumberOfReplicas) {
+        this.rolloverIndexNumberOfReplicas = rolloverIndexNumberOfReplicas;
+    }
+
+    public void setRolloverIndexMappingTotalFieldsLimit(String 
rolloverIndexMappingTotalFieldsLimit) {
+        this.rolloverIndexMappingTotalFieldsLimit = 
rolloverIndexMappingTotalFieldsLimit;
+    }
+
+    public void setRolloverIndexMaxDocValueFieldsSearch(String 
rolloverIndexMaxDocValueFieldsSearch) {
+        this.rolloverIndexMaxDocValueFieldsSearch = 
rolloverIndexMaxDocValueFieldsSearch;
+    }
+
+    public void setMinimalOpenSearchVersion(String minimalOpenSearchVersion) {
+        this.minimalOpenSearchVersion = minimalOpenSearchVersion;
+    }
+
+    public void setMaximalOpenSearchVersion(String maximalOpenSearchVersion) {
+        this.maximalOpenSearchVersion = maximalOpenSearchVersion;
+    }
+
+    public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
+        this.aggregateQueryBucketSize = aggregateQueryBucketSize;
+    }
+
+    public void setClientSocketTimeout(String clientSocketTimeout) {
+        if (StringUtils.isNumeric(clientSocketTimeout)) {
+            this.clientSocketTimeout = Integer.parseInt(clientSocketTimeout);
+        }
+    }
+
+    public void setMetricsService(MetricsService metricsService) {
+        this.metricsService = metricsService;
+    }
+
+    public void setUseBatchingForSave(boolean useBatchingForSave) {
+        this.useBatchingForSave = useBatchingForSave;
+    }
+
+    public void setUseBatchingForUpdate(boolean useBatchingForUpdate) {
+        this.useBatchingForUpdate = useBatchingForUpdate;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public void setSslEnable(boolean sslEnable) {
+        this.sslEnable = sslEnable;
+    }
+
+    public void setSslTrustAllCertificates(boolean sslTrustAllCertificates) {
+        this.sslTrustAllCertificates = sslTrustAllCertificates;
+    }
+
+
+    public void setAggQueryThrowOnMissingDocs(boolean 
aggQueryThrowOnMissingDocs) {
+        this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs;
+    }
+
+    public void setThrowExceptions(boolean throwExceptions) {
+        this.throwExceptions = throwExceptions;
+    }
+
+    public void setAlwaysOverwrite(boolean alwaysOverwrite) {
+        this.alwaysOverwrite = alwaysOverwrite;
+    }
+
+    public void setLogLevelRestClient(String logLevelRestClient) {
+        this.logLevelRestClient = logLevelRestClient;
+    }
+
+    public void setTaskWaitingTimeout(String taskWaitingTimeout) {
+        if (StringUtils.isNumeric(taskWaitingTimeout)) {
+            this.taskWaitingTimeout = Integer.parseInt(taskWaitingTimeout);
+        }
+    }
+
+    public void setTaskWaitingPollingInterval(String 
taskWaitingPollingInterval) {
+        if (StringUtils.isNumeric(taskWaitingPollingInterval)) {
+            this.taskWaitingPollingInterval = 
Integer.parseInt(taskWaitingPollingInterval);
+        }
+    }
+
+    public void setMinimalClusterState(String minimalClusterState) {
+        if ("GREEN".equalsIgnoreCase(minimalClusterState) || 
"YELLOW".equalsIgnoreCase(minimalClusterState)) {
+            this.minimalClusterState = minimalClusterState.toUpperCase();
+        } else {
+            LOGGER.warn("Invalid minimal cluster state: {}. Using default: 
GREEN", minimalClusterState);
+        }
+    }
+
+    public String getName() {
+        return "opensearch";
+    }
+
+    public void start() throws Exception {
+
+        // on startup
+        new InClassLoaderExecute<>(null, null, this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            public Object execute(Object... args) throws Exception {
+
+                buildClient();
+
+                InfoResponse response = client.info();
+                OpenSearchVersionInfo version = response.version();
+                Version clusterVersion = 
Version.parseVersion(version.number());
+                Version minimalVersion = 
Version.parseVersion(minimalOpenSearchVersion);
+                Version maximalVersion = 
Version.parseVersion(maximalOpenSearchVersion);
+                if (clusterVersion.compareTo(minimalVersion) < 0 ||
+                        clusterVersion.equals(maximalVersion) ||
+                        clusterVersion.compareTo(maximalVersion) > 0) {
+                    throw new Exception("OpenSearch version is not within [" + 
minimalVersion + "," + maximalVersion + "), aborting startup !");
+                }
+
+                waitForClusterHealth();
+
+                registerRolloverLifecyclePolicy();
+
+                loadPredefinedMappings(bundleContext, false);
+                loadPainlessScripts(bundleContext);
+
+                // load predefined mappings and condition dispatchers of any 
bundles that were started before this one.
+                for (Bundle existingBundle : bundleContext.getBundles()) {
+                    if (existingBundle.getBundleContext() != null) {
+                        
loadPredefinedMappings(existingBundle.getBundleContext(), false);
+                        loadPainlessScripts(existingBundle.getBundleContext());
+                    }
+                }
+
+                // Wait for minimal cluster state
+                LOGGER.info("Waiting for {} cluster status...", 
minimalClusterState);
+                client.cluster().health(new 
HealthRequest.Builder().waitForStatus(getHealthStatus(minimalClusterState)).build());
+                LOGGER.info("Cluster status is {}", minimalClusterState);
+
+                // We keep in memory the latest available session index to be 
able to load session using direct GET access on ES
+                if (isItemTypeRollingOver(Session.ITEM_TYPE)) {
+                    LOGGER.info("Sessions are using rollover indices, loading 
latest session index available ...");
+                    GetAliasResponse sessionAliasResponse = 
client.indices().getAlias(new 
GetAliasRequest.Builder().index(getIndex(Session.ITEM_TYPE)).build());
+                    Map<String, IndexAliases> aliases = 
sessionAliasResponse.result();
+                    if (!aliases.isEmpty()) {
+                        sessionLatestIndex = new 
TreeSet<>(aliases.keySet()).last();
+                        LOGGER.info("Latest available session index found is: 
{}", sessionLatestIndex);
+                    } else {
+                        throw new IllegalStateException("No index found for 
sessions");
+                    }
+                }
+
+                return true;
+            }
+        }.executeInClassLoader();
+
+        bundleContext.addBundleListener(this);
+
+        LOGGER.info(this.getClass().getName() + " service started 
successfully.");
+    }
+
+    private void buildClient() throws NoSuchFieldException, 
IllegalAccessException, NoSuchAlgorithmException, KeyStoreException, 
KeyManagementException {
+
+        List<Node> nodeList = new ArrayList<>();
+        for (String openSearchAddress : openSearchAddressList) {
+            String[] openSearchAddressParts = openSearchAddress.split(":");
+            String openSearchHostName = openSearchAddressParts[0];
+            int openSearchPort = Integer.parseInt(openSearchAddressParts[1]);
+
+            // configure authentication
+            nodeList.add(new Node(new HttpHost(openSearchHostName, 
openSearchPort, sslEnable ? "https" : "http")));
+        }
+
+        RestClientBuilder clientBuilder = 
RestClient.builder(nodeList.toArray(new Node[nodeList.size()]));
+
+        if (clientSocketTimeout != null) {
+            clientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
+                requestConfigBuilder.setSocketTimeout(clientSocketTimeout);
+                return requestConfigBuilder;
+            });
+        }
+
+        clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
+            if (sslTrustAllCertificates) {
+                try {
+                    final SSLContext sslContext = 
SSLContext.getInstance("SSL");
+                    sslContext.init(null, new TrustManager[]{new 
X509TrustManager() {
+                        public X509Certificate[] getAcceptedIssuers() {
+                            return null;
+                        }
+
+                        public void checkClientTrusted(X509Certificate[] certs,
+                                                       String authType) {
+                        }
+
+                        public void checkServerTrusted(X509Certificate[] certs,
+                                                       String authType) {
+                        }
+                    }}, new SecureRandom());
+
+                    
httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new 
NoopHostnameVerifier());
+                } catch (NoSuchAlgorithmException | KeyManagementException e) {
+                    LOGGER.error("Error creating SSL Context for trust all 
certificates", e);
+                }
+            }
+
+            if (StringUtils.isNotBlank(username)) {
+                final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+                credentialsProvider.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(username, password));
+
+                
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+            }
+
+            return httpClientBuilder;
+        });
+
+        restClient = clientBuilder.build();
+        OpenSearchTransport transport = new RestClientTransport(restClient, 
new JacksonJsonpMapper(OSCustomObjectMapper.getObjectMapper()));
+        client = new OpenSearchClient(transport);
+
+        LOGGER.info("Connecting to OpenSearch persistence backend using 
cluster name " + clusterName + " and index prefix " + indexPrefix + "...");
+    }
+
+
+    public void stop() {
+
+        new InClassLoaderExecute<>(null, null, this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Object execute(Object... args) throws IOException {
+                LOGGER.info("Closing OpenSearch persistence backend...");
+                if (client != null) {
+                    client.shutdown();
+                }
+                return null;
+            }
+        }.catchingExecuteInClassLoader(true);
+
+        bundleContext.removeBundleListener(this);
+    }
+
+    public void bindConditionEvaluator(ServiceReference<ConditionEvaluator> 
conditionEvaluatorServiceReference) {
+        ConditionEvaluator conditionEvaluator = 
bundleContext.getService(conditionEvaluatorServiceReference);
+        
conditionEvaluatorDispatcher.addEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString(),
 conditionEvaluator);
+    }
+
+    public void unbindConditionEvaluator(ServiceReference<ConditionEvaluator> 
conditionEvaluatorServiceReference) {
+        if (conditionEvaluatorServiceReference == null) {
+            return;
+        }
+        
conditionEvaluatorDispatcher.removeEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString());
+    }
+
+    public void 
bindConditionOSQueryBuilder(ServiceReference<ConditionOSQueryBuilder> 
conditionESQueryBuilderServiceReference) {
+        ConditionOSQueryBuilder conditionOSQueryBuilder = 
bundleContext.getService(conditionESQueryBuilderServiceReference);
+        
conditionOSQueryBuilderDispatcher.addQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString(),
 conditionOSQueryBuilder);
+    }
+
+    public void 
unbindConditionOSQueryBuilder(ServiceReference<ConditionOSQueryBuilder> 
conditionESQueryBuilderServiceReference) {
+        if (conditionESQueryBuilderServiceReference == null) {
+            return;
+        }
+        
conditionOSQueryBuilderDispatcher.removeQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString());
+    }
+
+    @Override
+    public void bundleChanged(BundleEvent event) {
+        if (event.getType() == BundleEvent.STARTING) {
+            loadPredefinedMappings(event.getBundle().getBundleContext(), true);
+            loadPainlessScripts(event.getBundle().getBundleContext());
+        }
+    }
+
+    private void loadPredefinedMappings(BundleContext bundleContext, boolean 
forceUpdateMapping) {
+        Enumeration<URL> predefinedMappings = 
bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
+        if (predefinedMappings == null) {
+            return;
+        }
+        while (predefinedMappings.hasMoreElements()) {
+            URL predefinedMappingURL = predefinedMappings.nextElement();
+            LOGGER.info("Found mapping at " + predefinedMappingURL + ", 
loading... ");
+            try {
+                final String path = predefinedMappingURL.getPath();
+                String name = path.substring(path.lastIndexOf('/') + 1, 
path.lastIndexOf('.'));
+                String mappingSource = loadMappingFile(predefinedMappingURL);
+
+                mappings.put(name, mappingSource);
+
+                if (!createIndex(name)) {
+                    LOGGER.info("Found index for type {}", name);
+                    if (forceUpdateMapping) {
+                        LOGGER.info("Updating mapping for {}", name);
+                        createMapping(name, mappingSource);
+                    }
+                }
+            } catch (Exception e) {
+                LOGGER.error("Error while loading mapping definition " + 
predefinedMappingURL, e);
+            }
+        }
+    }
+
+    private TypeMapping getTypeMapping(String mappingSource) {
+        JsonpMapper mapper = client._transport().jsonpMapper();
+        JsonParser parser = mapper
+                .jsonProvider()
+                .createParser(new StringReader(mappingSource));
+        return  TypeMapping._DESERIALIZER.deserialize(parser, mapper);
+    }
+
+    private void loadPainlessScripts(BundleContext bundleContext) {
+        Enumeration<URL> scriptsURL = 
bundleContext.getBundle().findEntries("META-INF/cxs/painless", "*.painless", 
true);
+        if (scriptsURL == null) {
+            return;
+        }
+
+        Map<String, String> scriptsById = new HashMap<>();
+        while (scriptsURL.hasMoreElements()) {
+            URL scriptURL = scriptsURL.nextElement();
+            LOGGER.info("Found painless script at " + scriptURL + ", 
loading... ");
+            try (InputStream in = scriptURL.openStream()) {
+                String script = IOUtils.toString(in, StandardCharsets.UTF_8);
+                String scriptId = 
FilenameUtils.getBaseName(scriptURL.getPath());
+                scriptsById.put(scriptId, script);
+            } catch (Exception e) {
+                LOGGER.error("Error while loading painless script " + 
scriptURL, e);
+            }
+
+        }
+
+        storeScripts(scriptsById);
+    }
+
+    private String  loadMappingFile(URL predefinedMappingURL) throws 
IOException {
+        BufferedReader reader = new BufferedReader(new 
InputStreamReader(predefinedMappingURL.openStream()));
+
+        StringBuilder content = new StringBuilder();
+        String l;
+        while ((l = reader.readLine()) != null) {
+            content.append(l);
+        }
+        return content.toString();
+    }
+
+    @Override
+    public <T extends Item> List<T> getAllItems(final Class<T> clazz) {
+        return getAllItems(clazz, 0, -1, null).getList();
+    }
+
+    @Override
+    public long getAllItemsCount(String itemType) {
+        return queryCount(Query.of(q -> q.matchAll(t -> t)), itemType);
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, 
int offset, int size, String sortBy) {
+        return getAllItems(clazz, offset, size, sortBy, null);
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, 
int offset, int size, String sortBy, String scrollTimeValidity) {
+        long startTime = System.currentTimeMillis();
+        try {
+            return query(Query.of(q -> q.matchAll(t -> t)), sortBy, clazz, 
offset, size, null, scrollTimeValidity);
+        } finally {
+            if (metricsService != null && metricsService.isActivated()) {
+                metricsService.updateTimer(this.getClass().getName() + 
".getAllItems", startTime);
+            }
+        }
+    }
+
+    @Override
+    public <T extends Item> T load(final String itemId, final Class<T> clazz) {
+        return load(itemId, clazz, null);
+    }
+
+    @Override
+    @Deprecated
+    public <T extends Item> T load(final String itemId, final Date dateHint, 
final Class<T> clazz) {
+        return load(itemId, clazz, null);
+    }
+
+    @Override
+    @Deprecated
+    public CustomItem loadCustomItem(final String itemId, final Date dateHint, 
String customItemType) {
+        return load(itemId, CustomItem.class, customItemType);
+    }
+
+    @Override
+    public CustomItem loadCustomItem(final String itemId, String 
customItemType) {
+        return load(itemId, CustomItem.class, customItemType);
+    }
+
+    private <T extends Item> T load(final String itemId, final Class<T> clazz, 
final String customItemType) {
+        if (StringUtils.isEmpty(itemId)) {
+            return null;
+        }
+
+        return new InClassLoaderExecute<T>(metricsService, 
this.getClass().getName() + ".loadItem", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected T execute(Object... args) throws Exception {
+                try {
+                    String itemType = Item.getItemType(clazz);
+                    if (customItemType != null) {
+                        itemType = customItemType;
+                    }
+                    String documentId = getDocumentIDForItemType(itemId, 
itemType);
+
+                    boolean sessionSpecialDirectAccess = sessionLatestIndex != 
null && Session.ITEM_TYPE.equals(itemType) ;
+                    if (!sessionSpecialDirectAccess && 
isItemTypeRollingOver(itemType)) {
+                        return new MetricAdapter<T>(metricsService, 
".loadItemWithQuery") {
+                            @Override
+                            public T execute(Object... args) throws Exception {
+                                if (customItemType == null) {
+                                    PartialList<T> r = query(Query.of(q -> 
q.ids(i -> i.values(documentId))), null, clazz, 0, 1, null, null);
+                                    if (r.size() > 0) {
+                                        return r.get(0);
+                                    }
+                                } else {
+                                    PartialList<CustomItem> r = 
query(Query.of(q -> q.ids(i -> i.values(documentId))), null, customItemType, 0, 
1, null, null);
+                                    if (r.size() > 0) {
+                                        return (T) r.get(0);
+                                    }
+                                }
+                                return null;
+                            }
+                        }.execute();
+                    } else {
+                        // Special handling for session we check the latest 
available index directly to speed up session loading
+                        GetRequest.Builder getRequest = new 
GetRequest.Builder().index(sessionSpecialDirectAccess ? sessionLatestIndex : 
getIndex(itemType)).id(documentId);
+                        GetResponse<T> response = 
client.get(getRequest.build(), clazz);
+                        if (response.found()) {
+                            T value = response.source();
+                            setMetadata(value, response.id(), 
response.version(), response.seqNo(), response.primaryTerm(), response.index());
+                            return value;
+                        } else {
+                            return null;
+                        }
+                    }
+                } catch (OpenSearchException ose) {
+                    if (ose.status() == 404) {
+                        // this can happen if we are just testing the 
existence of the item, it is not always an error.
+                        return null;
+                    }
+                    if ("IndexNotFound".equals(ose.error().type())) {
+                        // this can happen if we are just testing the 
existence of the item, it is not always an error.
+                        return null;
+                    }
+                    throw new Exception("Error loading itemType=" + 
clazz.getName() + " customItemType=" + customItemType + " itemId=" + itemId, 
ose);
+                } catch (Exception ex) {
+                    throw new Exception("Error loading itemType=" + 
clazz.getName() + " customItemType=" + customItemType + " itemId=" + itemId, 
ex);
+                }
+            }
+        }.catchingExecuteInClassLoader(true);
+
+    }
+
+    private void setMetadata(Item item, String itemId, long version, long 
seqNo, long primaryTerm, String index) {
+        if (!systemItems.contains(item.getItemType()) && item.getItemId() == 
null) {
+            item.setItemId(itemId);
+        }
+        item.setVersion(version);
+        item.setSystemMetadata(SEQ_NO, seqNo);
+        item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
+        item.setSystemMetadata("index", index);
+    }
+
+    @Override
+    public boolean isConsistent(Item item) {
+        return getRefreshPolicy(item.getItemType()) != Refresh.False;
+    }
+
+    @Override
+    public boolean save(final Item item) {
+        return save(item, useBatchingForSave, alwaysOverwrite);
+    }
+
+    @Override
+    public boolean save(final Item item, final boolean useBatching) {
+        return save(item, useBatching, alwaysOverwrite);
+    }
+
+    @Override
+    public boolean save(final Item item, final Boolean useBatchingOption, 
final Boolean alwaysOverwriteOption) {
+        final boolean useBatching = useBatchingOption == null ? 
this.useBatchingForSave : useBatchingOption;
+        final boolean alwaysOverwrite = alwaysOverwriteOption == null ? 
this.alwaysOverwrite : alwaysOverwriteOption;
+
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".saveItem", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws Exception {
+                try {
+                    String itemType = item.getItemType();
+                    if (item instanceof CustomItem) {
+                        itemType = ((CustomItem) item).getCustomItemType();
+                    }
+                    String documentId = 
getDocumentIDForItemType(item.getItemId(), itemType);
+                    String index = item.getSystemMetadata("index") != null ? 
(String) item.getSystemMetadata("index") : getIndex(itemType);
+
+                    IndexRequest.Builder<Item> indexRequest = new 
IndexRequest.Builder<Item>().index(index);
+                    indexRequest.id(documentId);
+                    indexRequest.document(item);
+
+                    if (!alwaysOverwrite) {
+                        Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
+                        Long primaryTerm = (Long) 
item.getSystemMetadata(PRIMARY_TERM);
+
+                        if (seqNo != null && primaryTerm != null) {
+                            indexRequest.ifSeqNo(seqNo);
+                            indexRequest.ifPrimaryTerm(primaryTerm);
+                        } else {
+                            indexRequest.opType(OpType.Create);
+                        }
+                    }
+
+                    if (routingByType.containsKey(itemType)) {
+                        indexRequest.routing(routingByType.get(itemType));
+                    }
+
+                    try {
+                        indexRequest.refresh(getRefreshPolicy(itemType));
+                        IndexResponse response = 
client.index(indexRequest.build());
+                        String responseIndex = response.index();
+                        String itemId = response.id();
+                        setMetadata(item, itemId, response.version(), 
response.seqNo(), response.primaryTerm(), responseIndex);
+
+                        // Special handling for session, in case of new 
session we check that a rollover happen or not to update the latest available 
index
+                        if (Session.ITEM_TYPE.equals(itemType) &&
+                                sessionLatestIndex != null &&
+                                response.result().equals(Result.Created) &&
+                                !responseIndex.equals(sessionLatestIndex)) {
+                            sessionLatestIndex = responseIndex;
+                        }
+                        logMetadataItemOperation("saved", item);
+                    } catch (OpenSearchException ose) {
+                        LOGGER.error("Could not find index {}, could not 
register item type {} with id {} ", index, itemType, item.getItemId(), ose);
+                        return false;
+                    }
+                    return true;
+                } catch (IOException e) {
+                    throw new Exception("Error saving item " + item, e);
+                }
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    @Override
+    public boolean update(final Item item, final Date dateHint, final Class 
clazz, final String propertyName, final Object propertyValue) {
+        return update(item, clazz, propertyName, propertyValue);
+    }
+
+    @Override
+    public boolean update(final Item item, final Date dateHint, final Class 
clazz, final Map source) {
+        return update(item, clazz, source);
+    }
+
+    @Override
+    public boolean update(final Item item, final Date dateHint, final Class 
clazz, final Map source, final boolean alwaysOverwrite) {
+        return update(item, clazz, source, alwaysOverwrite);
+    }
+
+    @Override
+    public boolean update(final Item item, final Class clazz, final String 
propertyName, final Object propertyValue) {
+        return update(item, clazz, Collections.singletonMap(propertyName, 
propertyValue), alwaysOverwrite);
+    }
+
+
+    @Override
+    public boolean update(final Item item, final Class clazz, final Map 
source) {
+        return update(item, clazz, source, alwaysOverwrite);
+    }
+
+    @Override
+    public boolean update(final Item item, final Class clazz, final Map 
source, final boolean alwaysOverwrite) {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".updateItem", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws Exception {
+                try {
+                    UpdateRequest updateRequest = createUpdateRequest(clazz, 
item, source, alwaysOverwrite);
+
+                    UpdateResponse response = client.update(updateRequest, 
Item.class);
+                    if (response.result().equals(Result.NoOp)) {
+                        LOGGER.warn("Update of item {} with source {} returned 
NoOp", item.getItemId(), source);
+                    }
+                    setMetadata(item, response.id(), response.version(), 
response.seqNo(), response.primaryTerm(), response.index());
+                    logMetadataItemOperation("updated", item);
+                    return true;
+                } catch (OpenSearchException ose) {
+                    throw new Exception("No index found for itemType=" + 
clazz.getName() + "itemId=" + item.getItemId(), ose);
+                }
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    private UpdateRequest createUpdateRequest(Class clazz, Item item, Map 
source, boolean alwaysOverwrite) {
+        String itemType = Item.getItemType(clazz);
+        String documentId = getDocumentIDForItemType(item.getItemId(), 
itemType);
+        String index =  item.getSystemMetadata("index") != null ? (String) 
item.getSystemMetadata("index") : getIndex(itemType);
+
+        UpdateRequest.Builder updateRequest = new UpdateRequest.Builder<Item, 
Map>().index(index).id(documentId);
+        updateRequest.doc(source);
+
+        if (!alwaysOverwrite) {
+            Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
+            Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM);
+
+            if (seqNo != null && primaryTerm != null) {
+                updateRequest.ifSeqNo(seqNo);
+                updateRequest.ifPrimaryTerm(primaryTerm);
+            }
+        }
+        return updateRequest.build();
+    }
+
+    private UpdateOperation createUpdateOperation(Class clazz, Item item, Map 
source, boolean alwaysOverwrite) {
+        String itemType = Item.getItemType(clazz);
+        String documentId = getDocumentIDForItemType(item.getItemId(), 
itemType);
+        String index =  item.getSystemMetadata("index") != null ? (String) 
item.getSystemMetadata("index") : getIndex(itemType);
+
+        UpdateOperation.Builder updateOperation = new 
UpdateOperation.Builder<Map>().index(index).id(documentId);
+        updateOperation.document(source);
+
+        if (!alwaysOverwrite) {
+            Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
+            Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM);
+
+            if (seqNo != null && primaryTerm != null) {
+                updateOperation.ifSeqNo(seqNo);
+                updateOperation.ifPrimaryTerm(primaryTerm);
+            }
+        }
+        return updateOperation.build();
+    }
+
+    @Override
+    public List<String> update(final Map<Item, Map> items, final Date 
dateHint, final Class clazz) {
+        if (items.isEmpty())
+            return new ArrayList<>();
+
+        return new InClassLoaderExecute<List<String>>(metricsService, 
OpenSearchPersistenceServiceImpl.this.getClass().getName() + ".updateItems", 
OpenSearchPersistenceServiceImpl.this.bundleContext, 
OpenSearchPersistenceServiceImpl.this.fatalIllegalStateErrors, throwExceptions) 
{
+            protected List<String> execute(Object... args) throws Exception {
+                long batchRequestStartTime = System.currentTimeMillis();
+
+                List<BulkOperation> operations = new ArrayList<>();
+                items.forEach((item, source) -> {
+                    UpdateOperation updateOperation = 
createUpdateOperation(clazz, item, source, alwaysOverwrite);
+                    operations.add(BulkOperation.of(b -> 
b.update(updateOperation)));
+                });
+
+                BulkResponse bulkResponse = client.bulk(b -> 
b.operations(operations));
+                LOGGER.debug("{} profiles updated with bulk segment in {}ms", 
operations.size(), System.currentTimeMillis() - batchRequestStartTime);
+
+                List<String> failedItemsIds = new ArrayList<>();
+
+                if (bulkResponse.errors()) {
+                    bulkResponse.items().forEach(bulkItemResponse -> {
+                        if (bulkItemResponse.error() != null) {
+                            failedItemsIds.add(bulkItemResponse.id());
+                        }
+                    });
+                }
+                return failedItemsIds;
+            }
+        }.catchingExecuteInClassLoader(true);
+    }
+
+    @Override
+    public boolean updateWithQueryAndScript(final Date dateHint, final 
Class<?> clazz, final String[] scripts, final Map<String, Object>[] 
scriptParams, final Condition[] conditions) {
+        return updateWithQueryAndScript(clazz, scripts, scriptParams, 
conditions);
+    }
+
+    @Override
+    public boolean updateWithQueryAndScript(final Class<?> clazz, final 
String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] 
conditions) {
+        Script[] builtScripts = new Script[scripts.length];
+        for (int i = 0; i < scripts.length; i++) {
+            final int finalI = i;
+            builtScripts[i] = Script.of(script -> 
script.inline(inline->inline.lang("painless").source(scripts[finalI]).params(convertParams(scriptParams[finalI]))));
+        }
+        return updateWithQueryAndScript(new Class<?>[]{clazz}, builtScripts, 
conditions, true);
+    }
+
+    private Map<String, JsonData> convertParams(Map<String, Object> 
scriptParams) {
+        Map<String,JsonData> jsonParams = new HashMap<>();
+        for (Map.Entry<String,Object> paramEntry : scriptParams.entrySet()) {
+            jsonParams.put(paramEntry.getKey(), 
JsonData.of(paramEntry.getValue(), jsonpMapper));
+        }
+        return jsonParams;
+    }
+
+    @Override
+    public boolean updateWithQueryAndStoredScript(Date dateHint, Class<?> 
clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] 
conditions) {
+        return updateWithQueryAndStoredScript(new Class<?>[]{clazz}, scripts, 
scriptParams, conditions, true);
+    }
+
+    @Override
+    public boolean updateWithQueryAndStoredScript(Class<?> clazz, String[] 
scripts, Map<String, Object>[] scriptParams, Condition[] conditions) {
+        return updateWithQueryAndStoredScript(new Class<?>[]{clazz}, scripts, 
scriptParams, conditions, true);
+    }
+
+    @Override
+    public boolean updateWithQueryAndStoredScript(Class<?>[] classes, String[] 
scripts, Map<String, Object>[] scriptParams, Condition[] conditions, boolean 
waitForComplete) {
+        Script[] builtScripts = new Script[scripts.length];
+        for (int i = 0; i < scripts.length; i++) {
+            final int finalI = i;
+            builtScripts[i] = Script.of(s -> s.stored(stored -> 
stored.id(scripts[finalI]).params(convertParams(scriptParams[finalI]))));
+        }
+        return updateWithQueryAndScript(classes, builtScripts, conditions, 
waitForComplete);
+    }
+
+    private boolean updateWithQueryAndScript(final Class<?>[] classes, final 
Script[] scripts, final Condition[] conditions, boolean waitForComplete) {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws Exception {
+                String[] itemTypes = 
Arrays.stream(classes).map(Item::getItemType).toArray(String[]::new);
+                List<String> indices = Arrays.stream(itemTypes).map(itemType 
-> getIndexNameForQuery(itemType)).collect(Collectors.toList());
+
+                try {
+                    for (int i = 0; i < scripts.length; i++) {
+                        RefreshRequest refreshRequest = new 
RefreshRequest.Builder().index(indices).build();
+                        client.indices().refresh(refreshRequest);
+
+                        Query queryBuilder = 
conditionOSQueryBuilderDispatcher.buildFilter(conditions[i]);
+                        UpdateByQueryRequest.Builder 
updateByQueryRequestBuilder = new UpdateByQueryRequest.Builder().index(indices);
+                        
updateByQueryRequestBuilder.conflicts(Conflicts.Proceed);
+                        // TODO fix this 
updateByQueryRequest.setMaxRetries(1000);
+                        updateByQueryRequestBuilder.slices(2L);
+                        updateByQueryRequestBuilder.script(scripts[i]);
+                        
updateByQueryRequestBuilder.query(wrapWithItemsTypeQuery(itemTypes, 
queryBuilder));
+                        updateByQueryRequestBuilder.waitForCompletion(false); 
// force the return of a task ID.
+
+                        UpdateByQueryRequest updateByQueryRequest = 
updateByQueryRequestBuilder.build();
+                        UpdateByQueryResponse updateByQueryResponse = 
client.updateByQuery(updateByQueryRequest);
+                        if (updateByQueryResponse == null) {
+                            LOGGER.error("update with query and script: no 
response returned for query: {}", queryBuilder);
+                        } else if (waitForComplete) {
+                            
waitForTaskComplete(updateByQueryRequest.toString(), 
updateByQueryRequest.toString(), updateByQueryResponse.task());
+                        } else {
+                            LOGGER.debug("ES task started {}", 
updateByQueryResponse.task());
+                        }
+                    }
+                    return true;
+                } catch (OpenSearchException ose) {
+                    throw new Exception("No index found for itemTypes=" + 
String.join(",", itemTypes), ose);
+                    /* TODO Implement this
+                } catch (ScriptException e) {
+                    LOGGER.error("Error in the update script : {}\n{}\n{}", 
e.getScript(), e.getDetailedMessage(), e.getScriptStack());
+                    throw new Exception("Error in the update script");
+                     */
+                }
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    private void waitForTaskComplete(String request, String requestSource, 
String taskId) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Waiting task [{}]: [{}] using query: [{}], polling 
every {}ms with a timeout configured to {}ms",
+                    taskId, request, requestSource, 
taskWaitingPollingInterval, taskWaitingTimeout);
+        }
+        if (taskId == null) {
+            LOGGER.warn("No taskId provided, can't wait for task [{}]", 
request);
+            return;
+        }
+        long start = System.currentTimeMillis();
+        new InClassLoaderExecute<Void>(metricsService, 
this.getClass().getName() + ".waitForTask", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Void execute(Object... args) throws Exception {
+
+                while (true){
+                    GetTasksResponse getTasksResponse = client.tasks().get(t 
-> t.taskId(taskId));
+                    if (getTasksResponse.completed()) {
+                        if (LOGGER.isDebugEnabled()) {
+                            long millis = 
getTasksResponse.task().runningTimeInNanos() / 1_000_000;
+                            long seconds = millis / 1000;
+
+                            LOGGER.debug("Waiting task [{}]: Finished in {} 
{}", taskId,
+                                    seconds >= 1 ? seconds : millis,
+                                    seconds >= 1 ? "seconds" : "milliseconds");
+                        }
+                        break;
+                    } else {
+                        if ((start + taskWaitingTimeout) < 
System.currentTimeMillis()) {
+                            LOGGER.error("Waiting task [{}]: Exceeded 
configured timeout ({}ms), aborting wait process", taskId, taskWaitingTimeout);
+                            break;
+                        }
+
+                        try {
+                            Thread.sleep(taskWaitingPollingInterval);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new IllegalStateException("Waiting task 
[{}]: interrupted");
+                        }
+                    }
+                }
+                return null;
+            }
+        }.catchingExecuteInClassLoader(true);
+    }
+
+    @Override
+    public boolean storeScripts(Map<String, String> scripts) {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".storeScripts", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws Exception {
+                boolean executedSuccessfully = true;
+                for (Map.Entry<String, String> script : scripts.entrySet()) {
+                    PutScriptRequest.Builder putScriptRequestBuilder = new 
PutScriptRequest.Builder();
+                    putScriptRequestBuilder.script(s -> 
s.lang("painless").source(script.getValue()));
+                    putScriptRequestBuilder.id(script.getKey());
+                    PutScriptResponse response = 
client.putScript(putScriptRequestBuilder.build());
+                    executedSuccessfully &= response.acknowledged();
+                    if (response.acknowledged()) {
+                        LOGGER.info("Successfully stored painless script: {}", 
script.getKey());
+                    } else {
+                        LOGGER.error("Failed to store painless script: {}", 
script.getKey());
+                    }
+                }
+                return executedSuccessfully;
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    public boolean updateWithScript(final Item item, final Date dateHint, 
final Class<?> clazz, final String script, final Map<String, Object> 
scriptParams) {
+        return updateWithScript(item, clazz, script, scriptParams);
+    }
+
+    @Override
+    public boolean updateWithScript(final Item item, final Class<?> clazz, 
final String script, final Map<String, Object> scriptParams) {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".updateWithScript", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws Exception {
+                try {
+                    String itemType = Item.getItemType(clazz);
+                    String index = getIndex(itemType);
+                    String documentId = 
getDocumentIDForItemType(item.getItemId(), itemType);
+
+                    Script actualScript = Script.of(s -> s.inline(i -> 
i.lang("painless").source(script).params(convertParams(scriptParams))));
+
+                    UpdateRequest.Builder<Item, Map> updateRequestBuilder = 
new UpdateRequest.Builder<Item, Map>().index(index).id(documentId);
+
+                    Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
+                    Long primaryTerm = (Long) 
item.getSystemMetadata(PRIMARY_TERM);
+
+                    if (seqNo != null && primaryTerm != null) {
+                        updateRequestBuilder.ifSeqNo(seqNo);
+                        updateRequestBuilder.ifPrimaryTerm(primaryTerm);
+                    }
+                    updateRequestBuilder.script(actualScript);
+                    UpdateResponse response = 
client.update(updateRequestBuilder.build(), Item.class);
+                    setMetadata(item, response.id(), response.version(), 
response.seqNo(), response.primaryTerm(), response.index());
+
+                    return true;
+                } catch (OpenSearchException ose) {
+                    throw new Exception("No index found for itemType=" + 
clazz.getName() + "itemId=" + item.getItemId(), ose);
+                }
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    @Override
+    public <T extends Item> boolean remove(final String itemId, final Class<T> 
clazz) {
+        return remove(itemId, clazz, null);
+    }
+
+    @Override
+    public boolean removeCustomItem(final String itemId, final String 
customItemType) {
+        return remove(itemId, CustomItem.class, customItemType);
+    }
+
+    private <T extends Item> boolean remove(final String itemId, final 
Class<T> clazz, String customItemType) {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".removeItem", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws Exception {
+                try {
+                    String itemType = Item.getItemType(clazz);
+                    if (customItemType != null) {
+                        itemType = customItemType;
+                    }
+                    String documentId = getDocumentIDForItemType(itemId, 
itemType);
+                    String index = getIndex(itemType);
+
+                    DeleteRequest deleteRequest = 
DeleteRequest.of(d->d.index(index).id(documentId));
+                    client.delete(deleteRequest);
+                    if (MetadataItem.class.isAssignableFrom(clazz)) {
+                        LOGGER.info("Item of type {} with ID {} has been 
removed", customItemType != null ? customItemType : clazz.getSimpleName(), 
itemId);
+                    }
+                    return true;
+                } catch (Exception e) {
+                    throw new Exception("Cannot remove", e);
+                }
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    public <T extends Item> boolean removeByQuery(final Condition query, final 
Class<T> clazz) {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".removeByQuery", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws Exception {
+                Query queryBuilder = 
conditionOSQueryBuilderDispatcher.getQueryBuilder(query);
+                return removeByQuery(queryBuilder, clazz);
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    public <T extends Item> boolean removeByQuery(Query queryBuilder, final 
Class<T> clazz) throws Exception {
+        try {
+            String itemType = Item.getItemType(clazz);
+            LOGGER.debug("Remove item of type {} using a query", itemType);
+            final DeleteByQueryRequest.Builder deleteByQueryRequestBuilder = 
new DeleteByQueryRequest.Builder().index(getIndexNameForQuery(itemType))
+                    .query(wrapWithItemTypeQuery(itemType, queryBuilder))
+                    // Setting slices to auto will let OpenSearch choose the 
number of slices to use.
+                    // This setting will use one slice per shard, up to a 
certain limit.
+                    // The delete request will be more efficient and faster 
than no slicing.
+                    .slices(0L) // 0L means auto
+                    // OpenSearch takes a snapshot of the index when you hit 
delete by query request and uses the _version of the documents to process the 
request.
+                    // If a document gets updated in the meantime, it will 
result in a version conflict error and the delete operation will fail.
+                    // So we explicitly set the conflict strategy to proceed 
in case of version conflict.
+                    .conflicts(Conflicts.Proceed)
+                    // We force waitForCompletion to the value false to make 
sure we get back a taskID that we can then poll for
+                    // in our waitForTaskComplete method
+                    .waitForCompletion(false)
+                    // Remove by Query is mostly used for purge and cleaning 
up old data
+                    // It's mostly used in jobs/timed tasks so we don't really 
care about long request
+                    // So we increase default timeout of 1min to 10min
+                    .timeout(t -> t.time(removeByQueryTimeoutInMinutes + "m"));
+
+            DeleteByQueryRequest deleteByQueryRequest = 
deleteByQueryRequestBuilder.build();
+            DeleteByQueryResponse deleteByQueryResponse = 
client.deleteByQuery(deleteByQueryRequest);
+
+            if (deleteByQueryResponse == null) {
+                LOGGER.error("Remove by query: no response returned for query: 
{}", queryBuilder);
+                return false;
+            }
+
+            waitForTaskComplete(deleteByQueryRequest.toString(), 
deleteByQueryRequest.toString(), deleteByQueryResponse.task());
+
+            return true;
+        } catch (Exception e) {
+            throw new Exception("Cannot remove by query", e);
+        }
+    }
+
+    public boolean indexTemplateExists(final String templateName) {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".indexTemplateExists", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws IOException {
+                return client.indices().existsTemplate(e -> 
e.name(templateName)).value();
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    public boolean removeIndexTemplate(final String templateName) {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".removeIndexTemplate", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws IOException {
+                DeleteTemplateResponse deleteTemplateResponse = 
client.indices().deleteTemplate(d -> d.name(templateName));
+                return deleteTemplateResponse.acknowledged();
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    public boolean registerRolloverLifecyclePolicy() {
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".createMonthlyIndexLifecyclePolicy", 
this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws IOException {
+                try {
+                    String policyName = indexPrefix + 
"-rollover-lifecycle-policy";
+                    String endpoint = "_plugins/_ism/policies/" + policyName;
+
+                    RestClient restClient = ((RestClientTransport) 
client._transport()).restClient();
+
+                    // Upon initial OpenSearch startup, the 
.opendistro-ism-config index may not exist yet, so we need to check if it 
exists first
+                    // Check if the .opendistro-ism-config index exists
+                    Request checkIndexRequest = new Request("HEAD", 
".opendistro-ism-config");
+                    Response checkIndexResponse = 
restClient.performRequest(checkIndexRequest);
+
+                    if (checkIndexResponse.getStatusLine().getStatusCode() == 
404) {
+                        LOGGER.info(".opendistro-ism-config index does not 
exist. Initializing ISM configuration.");
+                    } else {
+                        Request getRequest = new Request("GET", endpoint);
+                        Response response = 
restClient.performRequest(getRequest);
+                        if (response.getStatusLine().getStatusCode() == 200) {
+                            LOGGER.info("Found existing rollover lifecycle 
policy, deleting the existing one.");
+                            Request deleteRequest = new Request("DELETE", 
endpoint);
+                            restClient.performRequest(deleteRequest);
+                        }
+                    }
+
+                    // Build the ILM policy JSON
+                    Map<String, Object> rolloverAction = new HashMap<>();
+
+                    if (rolloverMaxDocs != null && !rolloverMaxDocs.isEmpty()) 
{
+                        rolloverAction.put("min_doc_count", 
Long.parseLong(rolloverMaxDocs));
+                    }
+                    if (rolloverMaxSize != null && !rolloverMaxSize.isEmpty()) 
{
+                        rolloverAction.put("min_size", rolloverMaxSize);
+                    }
+                    if (rolloverMaxAge != null && !rolloverMaxAge.isEmpty()) {
+                        rolloverAction.put("min_index_age", rolloverMaxAge);
+                    }
+
+                    List<Map<String,Object>> actions = new ArrayList<>();
+                    actions.add(Map.of("rollover", rolloverAction));
+                    Map<String, Object> state = new HashMap<>();
+                    state.put("name", "ingest");
+                    state.put("actions", actions);
+                    state.put("transitions", new ArrayList<>());
+                    List<Map<String,Object>> states = new ArrayList<>();
+                    states.add(state);
+                    Map<String, Object> policy = new HashMap<>();
+                    policy.put("states", states);
+                    policy.put("default_state", "ingest");
+                    policy.put("description", "Rollover lifecycle policy");
+                    if (rolloverIndices != null && !rolloverIndices.isEmpty()) 
{
+                        Map<String,Object> ismTemplate = new HashMap<>();
+                        List<String> indexPatterns = new ArrayList<>();
+                        indexPatterns.addAll(rolloverIndices);
+                        ismTemplate.put("index_patterns", indexPatterns);
+                        ismTemplate.put("priority", 100);
+                        policy.put("ism_template", ismTemplate);
+                    }
+                    Map<String,Object> policies = new HashMap<>();
+                    policies.put("policy", policy);
+
+                    // Convert the policy to JSON
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    String policyJson = 
objectMapper.writeValueAsString(policies);
+
+                    // Send the request
+
+                    Request request = new Request("PUT", endpoint);
+                    request.setJsonEntity(policyJson);
+                    Response response = restClient.performRequest(request);
+
+                    return response.getStatusLine().getStatusCode() == 200;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    return false;
+                }
+            }
+        }.catchingExecuteInClassLoader(true);
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    public boolean createIndex(final String itemType) {
+        LOGGER.debug("Create index {}", itemType);
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".createIndex", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws IOException {
+                String index = getIndex(itemType);
+                boolean indexExists = client.indices().exists(e -> 
e.index(index)).value();
+
+                if (!indexExists) {
+                    if (isItemTypeRollingOver(itemType)) {
+                        internalCreateRolloverTemplate(itemType);
+                        internalCreateRolloverIndex(index);
+                    } else {
+                        internalCreateIndex(index, mappings.get(itemType));
+                    }
+                }
+
+                return !indexExists;
+            }
+        }.catchingExecuteInClassLoader(true);
+
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    public boolean removeIndex(final String itemType) {
+        String index = getIndex(itemType);
+
+        Boolean result = new InClassLoaderExecute<Boolean>(metricsService, 
this.getClass().getName() + ".removeIndex", this.bundleContext, 
this.fatalIllegalStateErrors, throwExceptions) {
+            protected Boolean execute(Object... args) throws IOException {
+                boolean indexExists = client.indices().exists(e -> 
e.index(index)).value();
+                if (indexExists) {
+                    client.indices().delete(d -> d.index(index));
+                }
+                return indexExists;
+            }
+        }.catchingExecuteInClassLoader(true);
+
+        return Objects.requireNonNullElse(result, false);
+    }
+
+    private void internalCreateRolloverTemplate(String itemName) throws 
IOException {
+        String rolloverAlias = indexPrefix + "-" + itemName;
+        if (mappings.get(itemName) == null) {
+            LOGGER.warn("Couldn't find mapping for item {}, won't create 
monthly index template", itemName);
+            return;
+        }
+        String indexSource =
+                "    {" +
+                "        \"number_of_shards\" : " + 
StringUtils.defaultIfEmpty(rolloverIndexNumberOfShards, 
monthlyIndexNumberOfShards) + "," +
+                "        \"number_of_replicas\" : " + 
StringUtils.defaultIfEmpty(rolloverIndexNumberOfReplicas, 
monthlyIndexNumberOfReplicas) + "," +
+                "        \"mapping.total_fields.limit\" : " + 
StringUtils.defaultIfEmpty(rolloverIndexMappingTotalFieldsLimit, 
monthlyIndexMappingTotalFieldsLimit) + "," +
+                "        \"max_docvalue_fields_search\" : " + 
StringUtils.defaultIfEmpty(rolloverIndexMaxDocValueFieldsSearch, 
monthlyIndexMaxDocValueFieldsSearch) + "," +
+                "        \"plugins.index_state_management.rollover_alias\": 
\"" + rolloverAlias + "\"" +
+                "    },";
+        String analysisSource =
+                "    {" +
+                "      \"analyzer\": {" +
+                "        \"folding\": {" +
+                "          \"type\":\"custom\"," +
+                "          \"tokenizer\": \"keyword\"," +
+                "          \"filter\":  [ \"lowercase\", \"asciifolding\" ]" +
+                "        }\n" +
+                "      }\n" +
+                "    }\n";
+        Map<String, JsonData> settings = new HashMap<>();
+        settings.put("index", getJsonData(indexSource));
+        settings.put("analysis", getJsonData(analysisSource));
+        client.indices().putTemplate(p->
+                p.name(rolloverAlias + "-rollover-template")
+                        
.indexPatterns(Collections.singletonList(getRolloverIndexForQuery(itemName)))
+                        .order(1)
+                        .settings(settings)
+                        .mappings(getTypeMapping(mappings.get(itemName)))
+        );
+    }
+
+    private JsonData getJsonData(String input) {
+        JsonpMapper mapper = client._transport().jsonpMapper();
+        JsonParser parser = mapper
+                .jsonProvider()
+                .createParser(new 
ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
+        return  JsonData._DESERIALIZER.deserialize(parser, mapper);
+    }
+
+    private void internalCreateRolloverIndex(String indexName) throws 
IOException {
+        CreateIndexResponse createIndexResponse = client.indices().create(c->c
+                .index(indexName + "-000001")
+                .aliases(indexName, a->a.isWriteIndex(true)));
+        LOGGER.info("Index created: [{}], acknowledge: [{}], shards 
acknowledge: [{}]", createIndexResponse.index(),
+                createIndexResponse.acknowledged(), 
createIndexResponse.shardsAcknowledged());
+    }
+
+    private void internalCreateIndex(String indexName, String mappingSource) 
throws IOException {
+        CreateIndexResponse createIndexResponse = client.indices().create(c -> 
c
+                .index(indexName)
+                .settings(s -> s
+                        .index(i -> i
+                                .numberOfShards(numberOfShards)
+                                .numberOfReplicas(numberOfReplicas)
+                                .mapping(m -> m
+                                        .totalFields(t -> t
+                                                
.limit(Long.parseLong(indexMappingTotalFieldsLimit))
+                                        )
+                                )
+                                
.maxDocvalueFieldsSearch(Integer.parseInt(indexMaxDocValueFieldsSearch))
+                        )
+                        .analysis(a -> a
+                                .analyzer("folding", an -> an
+                                        .custom(cu -> cu
+                                                .tokenizer("keyword")
+                                                .filter("lowercase", 
"asciifolding")
+                                        )
+                                )
+                        )
+                )
+                .mappings(getTypeMapping(mappingSource))
+        );
+        LOGGER.info("Index created: [{}], acknowledge: [{}], shards 
acknowledge: [{}]", createIndexResponse.index(),
+                createIndexResponse.acknowledged(), 
createIndexResponse.shardsAcknowledged());
+    }
+
+    @Override
+    public void createMapping(String type, String source) {
+        try {
+            putMapping(source, getIndex(type));
+        } catch (IOException ioe) {
+            LOGGER.error("Error while creating mapping for type " + type + " 
and source " + source, ioe);
+        }
+    }
+
+    public void setPropertyMapping(final PropertyType property, final String 
itemType) {
+        try {
+            Map<String, Map<String, Object>> mappings = 
getPropertiesMapping(itemType);
+            if (mappings == null) {
+                mappings = new HashMap<>();
+            }
+            Map<String, Object> subMappings = 
mappings.computeIfAbsent("properties", k -> new HashMap<>());
+            Map<String, Object> subSubMappings = (Map<String, Object>) 
subMappings.computeIfAbsent("properties", k -> new HashMap<>());
+
+            if (subSubMappings.containsKey(property.getItemId())) {
+                LOGGER.warn("Mapping already exists for type " + itemType + " 
and property " + property.getItemId());
+                return;
+            }
+
+            Map<String, Object> propertyMapping = 
createPropertyMapping(property);
+            if (propertyMapping.isEmpty()) {
+                return;
+            }
+
+            mergePropertiesMapping(subSubMappings, propertyMapping);
+
+            Map<String, Object> mappingsWrapper = new HashMap<>();
+            mappingsWrapper.put("properties", mappings);
+            final String mappingsSource = 
OSCustomObjectMapper.getObjectMapper().writeValueAsString(mappingsWrapper);
+
+            putMapping(mappingsSource, getIndex(itemType));
+        } catch (IOException ioe) {
+            LOGGER.error("Error while creating mapping for type " + itemType + 
" and property " + property.getValueTypeId(), ioe);
+        }
+    }
+
+    private Map<String, Object> createPropertyMapping(final PropertyType 
property) {
+        final String esType = 
convertValueTypeToESType(property.getValueTypeId());
+        final HashMap<String, Object> definition = new HashMap<>();
+
+        if (esType == null) {
+            LOGGER.warn("No predefined type found for property[{}], no mapping 
will be created", property.getValueTypeId());
+            return Collections.emptyMap();
+        } else {
+            definition.put("type", esType);
+            if ("text".equals(esType)) {
+                definition.put("analyzer", "folding");
+                final Map<String, Object> fields = new HashMap<>();
+                final Map<String, Object> keywordField = new HashMap<>();
+                keywordField.put("type", "keyword");
+                keywordField.put("ignore_above", 256);
+                fields.put("keyword", keywordField);
+                definition.put("fields", fields);
+            }
+        }
+
+        if ("set".equals(property.getValueTypeId())) {
+            Map<String, Object> childProperties = new HashMap<>();
+            property.getChildPropertyTypes().forEach(childType -> {
+                Map<String, Object> propertyMapping = 
createPropertyMapping(childType);
+                if (!propertyMapping.isEmpty()) {
+                    mergePropertiesMapping(childProperties, propertyMapping);
+                }
+            });
+            definition.put("properties", childProperties);
+        }
+
+        return Collections.singletonMap(property.getItemId(), definition);
+    }
+
+    private String convertValueTypeToESType(String valueTypeId) {
+        switch (valueTypeId) {
+            case "set":
+            case "json":
+                return "object";
+            case "boolean":
+                return "boolean";
+            case "geopoint":
+                return "geo_point";
+            case "integer":
+                return "integer";
+            case "long":
+                return "long";
+            case "float":
+                return "float";
+            case "date":
+                return "date";
+            case "string":
+            case "id":
+            case "email": // TODO Consider supporting email mapping in ES, 
right now will be map to text to avoid warning in logs

Review Comment:
   ```suggestion
               case "email": // TODO Consider supporting email mapping in OS, 
right now will be map to text to avoid warning in logs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@unomi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to