This is an automated email from the ASF dual-hosted git repository. shuber pushed a commit to branch UNOMI-225-ES7 in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/UNOMI-225-ES7 by this push: new 30f7ff1 UNOMI-225 ElasticSearch 7 support - Rewrote all the code in the persistence service - Working on getting the import package statements right, probably still some problems in them - Karaf starts but Unomi doesn't start properly yet 30f7ff1 is described below commit 30f7ff13ae35d87aa2ebafdf68b466509c4c4e7a Author: Serge Huber <shu...@apache.org> AuthorDate: Mon Nov 18 22:25:48 2019 +0100 UNOMI-225 ElasticSearch 7 support - Rewrote all the code in the persistence service - Working on getting the import package statements right, probably still some problems in them - Karaf starts but Unomi doesn't start properly yet Signed-off-by: Serge Huber <shu...@apache.org> --- persistence-elasticsearch/core/pom.xml | 184 ++++--- .../ElasticSearchPersistenceServiceImpl.java | 606 +++++++++------------ .../ElasticsearchPersistenceTest.java | 17 +- .../conditions/PropertyConditionEvaluator.java | 23 +- plugins/request/pom.xml | 4 + pom.xml | 4 +- 6 files changed, 398 insertions(+), 440 deletions(-) diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index 6f43269..69c43fd 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -64,6 +64,33 @@ <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency> + <!-- + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch-core</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch-secure-sm</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch-x-content</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch-geo</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + --> <dependency> <groupId>org.elasticsearch.test</groupId> <artifactId>framework</artifactId> @@ -196,88 +223,85 @@ <configuration> <instructions> <Import-Package> -<!-- com.google.protobuf;resolution:=optional,--> -<!-- com.twitter.util;resolution:=optional,--> -<!-- com.vividsolutions.jts.*;resolution:=optional,--> -<!-- javax.annotation;resolution:=optional,--> -<!-- javax.crypto;resolution:=optional,--> -<!-- javax.crypto.spec;resolution:=optional,--> -<!-- javax.net.ssl;resolution:=optional,--> -<!-- javax.security.auth.x500;resolution:=optional,--> -<!-- javax.security.cert;resolution:=optional,--> -<!-- javax.servlet;resolution:=optional,--> -<!-- javax.servlet.http;resolution:=optional,--> -<!-- javax.xml.bind;resolution:=optional,--> -<!-- javax.xml.parsers;resolution:=optional,--> -<!-- javax.xml.transform;resolution:=optional,--> -<!-- javax.xml.transform.dom;resolution:=optional,--> -<!-- javax.xml.transform.stream;resolution:=optional,--> -<!-- org.apache.commons.logging;resolution:=optional,--> -<!-- org.apache.regexp;resolution:=optional,--> -<!-- org.apache.tomcat.jni;resolution:=optional,--> -<!-- org.bouncycastle.*;resolution:=optional,--> -<!-- org.eclipse.jetty.npn;resolution:=optional,--> -<!-- org.jboss.logging;resolution:=optional,--> -<!-- org.jboss.marshalling;resolution:=optional,--> -<!-- org.jruby;resolution:=optional,--> -<!-- org.jruby.embed;resolution:=optional,--> -<!-- sun.misc;resolution:=optional,--> -<!-- sun.security.util;resolution:=optional,--> -<!-- sun.security.x509;resolution:=optional,--> -<!-- com.google.protobuf.nano;resolution:=optional,--> -<!-- com.jcraft.jzlib;resolution:=optional,--> -<!-- com.ning.compress;resolution:=optional,--> -<!-- com.ning.compress.lzf;resolution:=optional,--> -<!-- com.ning.compress.lzf.util;resolution:=optional,--> -<!-- javassist;resolution:=optional,--> -<!-- lzma.sdk;resolution:=optional,--> -<!-- lzma.sdk.lzma;resolution:=optional,--> -<!-- net.jpountz.lz4;resolution:=optional,--> -<!-- net.jpountz.xxhash;resolution:=optional,--> -<!-- org.apache.tomcat;resolution:=optional,--> -<!-- org.eclipse.jetty.alpn;resolution:=optional,--> -<!-- org.joda.convert;resolution:=optional,--> -<!-- org.locationtech.spatial4j.context;resolution:=optional,--> -<!-- org.locationtech.spatial4j.context.jts;resolution:=optional,--> -<!-- org.locationtech.spatial4j.distance;resolution:=optional,--> -<!-- org.locationtech.spatial4j.exception;resolution:=optional,--> -<!-- org.locationtech.spatial4j.io;resolution:=optional,--> -<!-- org.locationtech.spatial4j.shape;resolution:=optional,--> -<!-- org.locationtech.spatial4j.shape.impl;resolution:=optional,--> -<!-- org.locationtech.spatial4j.shape.jts;resolution:=optional,--> -<!-- org.zeromq;resolution:=optional,--> -<!-- org.apache.commons.compress.compressors;resolution:=optional,--> -<!-- org.apache.commons.compress.utils;resolution:=optional,--> -<!-- org.apache.commons.csv;resolution:=optional,--> -<!-- org.apache.kafka.clients.producer;resolution:=optional,--> -<!-- javax.persistence;resolution:=optional,--> -<!-- com.google.errorprone.annotations.concurrent;resolution:=optional,--> -<!-- com.lmax.disruptor;resolution:=optional,--> -<!-- com.lmax.disruptor.dsl;resolution:=optional,--> -<!-- com.fasterxml.jackson.dataformat.xml;resolution:=optional,--> -<!-- com.fasterxml.jackson.dataformat.xml.annotation;resolution:=optional,--> -<!-- com.fasterxml.jackson.dataformat.xml.util;resolution:=optional,--> -<!-- io.netty.internal.tcnative;resolution:=optional,--> -<!-- org.conscrypt;resolution:=optional,--> - org.apache.unomi.api, - org.apache.unomi.api.rules, - org.apache.unomi.api.actions, - org.apache.unomi.api.campaigns, - org.apache.unomi.api.campaigns.events, - org.apache.unomi.api.conditions, - org.apache.unomi.api.goals, - org.apache.unomi.api.lists, - org.apache.unomi.api.query, - org.apache.unomi.api.segments, + com.carrotsearch.randomizedtesting;resolution:=optional, + com.fasterxml.jackson.*;resolution:=optional, + com.google.appengine.api;resolution:=optional, + com.google.apphosting.api;resolution:=optional, + com.google.common.geometry;resolution:=optional, + com.google.errorprone.annotations.concurrent;resolution:=optional, + com.hazelcast.core;version="[3.4,4)";resolution:=optional, + com.lmax.disruptor;resolution:=optional, + com.lmax.disruptor.dsl;resolution:=optional, + com.sun.management;resolution:=optional, + javax.activation;resolution:=optional, + javax.annotation;resolution:=optional, + javax.annotation.processing;resolution:=optional, + javax.crypto;resolution:=optional, + javax.crypto.spec;resolution:=optional, + javax.jms;resolution:=optional, + javax.lang.model;resolution:=optional, + javax.lang.model.element;resolution:=optional, + javax.lang.model.util;resolution:=optional, + javax.mail;resolution:=optional, + javax.mail.internet;resolution:=optional, + javax.mail.util;resolution:=optional, + javax.management;resolution:=optional, + javax.naming;resolution:=optional, + javax.naming.directory;resolution:=optional, + javax.naming.ldap;resolution:=optional, + javax.net;resolution:=optional, + javax.net.ssl;resolution:=optional, + javax.persistence;resolution:=optional, + javax.script;resolution:=optional, + javax.security.auth.x500;resolution:=optional, + javax.servlet;resolution:=optional, + javax.sql;resolution:=optional, + javax.tools;resolution:=optional, + javax.xml.bind;resolution:=optional, + javax.xml.parsers;resolution:=optional, + javax.xml.stream;resolution:=optional, + javax.xml.transform;resolution:=optional, + javax.xml.transform.sax;resolution:=optional, + javax.xml.transform.stream;resolution:=optional, + javax.xml.validation;resolution:=optional, + jdk.net;resolution:=optional, + org.apache.avalon.framework.logger;resolution:=optional, + org.apache.commons.compress.compressors;resolution:=optional, + org.apache.commons.compress.utils;resolution:=optional, + org.apache.commons.csv;resolution:=optional, + org.apache.kafka.clients.producer;resolution:=optional, + org.apache.log;resolution:=optional, + org.apache.unomi.api.conditions;version="[1.5,2)", + org.apache.unomi.api.query;version="[1.5,2)", + org.apache.unomi.api;version="[1.5,2)", + org.apache.unomi.metrics;version="[1.5,2)", + org.apache.unomi.persistence.spi.aggregate;version="[1.5,2)", + org.apache.unomi.persistence.spi;version="[1.5,2)", + org.codehaus.stax2;resolution:=optional, + org.elasticsearch.*;resolution:=optional, + org.ietf.jgss;resolution:=optional, + org.joda.convert;resolution:=optional, + org.locationtech.jts.geom;resolution:=optional, + org.locationtech.spatial4j.*;resolution:=optional, + org.osgi.framework.wiring;version="[1.0,2)", + org.osgi.framework;version="[1.6,2)", + org.osgi.service.blueprint;version="[1.0.0,2.0.0)", + org.slf4j;version="[1.7,2)", + org.w3c.dom;resolution:=optional, + org.xml.sax;resolution:=optional, + org.xml.sax.ext;resolution:=optional, + org.xml.sax.helpers;resolution:=optional, + org.zeromq;resolution:=optional, + sun.misc;resolution:=optional, * </Import-Package> -<!-- <Export-Package>--> -<!-- org.elasticsearch.*;version="${elasticsearch.version}",--> -<!-- org.elasticsearch.index.query.*;version="${elasticsearch.version}",--> -<!-- org.apache.unomi.persistence.elasticsearch.conditions;version="${project.version}"--> -<!-- </Export-Package>--> -<!-- <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>--> -<!-- <Embed-Transitive>true</Embed-Transitive>--> + <Export-Package> + org.elasticsearch.*;version="${elasticsearch.version}", + org.elasticsearch.index.query.*;version="${elasticsearch.version}", + org.apache.unomi.persistence.elasticsearch.conditions;version="${project.version}" + </Export-Package> + <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency> + <Embed-Transitive>true</Embed-Transitive> </instructions> </configuration> </plugin> diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 8c0a500..d85ede2 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -17,82 +17,92 @@ package org.apache.unomi.persistence.elasticsearch; -import static org.elasticsearch.index.query.QueryBuilders.termQuery; - import com.hazelcast.core.HazelcastInstance; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URL; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.unomi.api.Item; import org.apache.unomi.api.PartialList; import org.apache.unomi.api.TimestampedItem; import org.apache.unomi.api.conditions.Condition; +import org.apache.unomi.api.query.DateRange; +import org.apache.unomi.api.query.IpRange; +import org.apache.unomi.api.query.NumericRange; import org.apache.unomi.metrics.MetricAdapter; import org.apache.unomi.metrics.MetricsService; -import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper; -import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder; -import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher; -import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluator; -import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluatorDispatcher; +import org.apache.unomi.persistence.elasticsearch.conditions.*; import org.apache.unomi.persistence.spi.PersistenceService; -import org.apache.unomi.persistence.spi.aggregate.BaseAggregate; +import org.apache.unomi.persistence.spi.aggregate.*; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; +import org.elasticsearch.action.bulk.*; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Node; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.*; import org.elasticsearch.client.core.MainResponse; -import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.*; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptException; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.*; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.filter.Filter; +import org.elasticsearch.search.aggregations.bucket.global.Global; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.IpRangeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.BundleEvent; -import org.osgi.framework.ServiceReference; -import org.osgi.framework.SynchronousBundleListener; +import org.osgi.framework.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.index.query.QueryBuilders.termQuery; + @SuppressWarnings("rawtypes") public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener { @@ -105,6 +115,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize"; public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval"; public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy"; + public static final String INDEX_DATE_PREFIX = "date-"; private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); private RestHighLevelClient client; private BulkProcessor bulkProcessor; @@ -303,26 +314,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, client = new RestHighLevelClient( RestClient.builder(nodeList.toArray(new Node[nodeList.size()]))); - // let's now check the versions of all the nodes in the cluster, to make sure they are as expected. MainResponse response = client.info(RequestOptions.DEFAULT); org.elasticsearch.client.core.MainResponse.Version version = response.getVersion(); - //TODO change check of version prerequisite -// try { -// org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion); -// org.elasticsearch.Version maximalVersion = org.elasticsearch.Version.fromString(maximalElasticSearchVersion); -// for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { -// org.elasticsearch.Version version = nodeInfo.getVersion(); -// if (version.before(minimalVersion) || -// version.equals(maximalVersion) || -// version.after(maximalVersion)) { -// throw new Exception("ElasticSearch version on node " + nodeInfo.getHostname() + " is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !"); -// } -// } -// } catch (InterruptedException e) { -// throw new Exception("Error checking ElasticSearch versions", e); -// } catch (ExecutionException e) { -// throw new Exception("Error checking ElasticSearch versions", e); -// } + org.elasticsearch.Version clusterVersion = org.elasticsearch.Version.fromString(version.getNumber()); + org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion); + org.elasticsearch.Version maximalVersion = org.elasticsearch.Version.fromString(maximalElasticSearchVersion); + if (clusterVersion.before(minimalVersion) || + clusterVersion.equals(maximalVersion) || + clusterVersion.after(maximalVersion)) { + throw new Exception("ElasticSearch version is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !"); + } loadPredefinedMappings(bundleContext, false); @@ -349,7 +350,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } if (!indexExists) { logger.info("{} index doesn't exist yet, creating it...", indexName); - Map<String, String> indexMappings = new HashMap<String, String>(); + Map<String, Object> indexMappings = new HashMap<>(); indexMappings.put("_default_", mappings.get("_default_")); for (Map.Entry<String, String> entry : mappings.entrySet()) { if (!itemsMonthlyIndexed.contains(entry.getKey()) && !indexNames.containsKey(entry.getKey())) { @@ -523,12 +524,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - private String getMonthlyIndexName(Date date) { - String d = new SimpleDateFormat("-yyyy-MM").format(date); - String monthlyIndexName = indexName + d; - return monthlyIndexName; - } - private void loadPredefinedMappings(BundleContext bundleContext, boolean createMapping) { Enumeration<URL> predefinedMappings = bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true); if (predefinedMappings == null) { @@ -610,18 +605,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } }.execute(); } else { - String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(dateHint) : indexName); - - SearchRequest searchRequest = new SearchRequest(index); - searchRequest.searchType(itemType); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.idsQuery().addIds(itemId)); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - /* TODO implement ES7 - GetResponse response = client.prepareGet(index, itemType, itemId) - .execute() - .actionGet(); + GetRequest getRequest = new GetRequest(getIndex(itemType, dateHint), itemId); + GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); if (response.isExists()) { String sourceAsString = response.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); @@ -632,7 +617,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } else { return null; } - */ } } catch (IndexNotFoundException e) { // this can happen if we are just testing the existence of the item, it is not always an error. @@ -640,7 +624,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } catch (Exception ex) { throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, ex); } - return null;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); @@ -660,27 +643,24 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String itemType = item.getItemType(); String itemId = item.getItemId(); putInCache(itemId, item); - String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(((TimestampedItem) item).getTimeStamp()) : indexName); - /* TODO implement ES7 - IndexRequestBuilder indexBuilder = client.prepareIndex(index, itemType, itemId) - .setSource(source, XContentType.JSON); + String index = getIndex(itemType, ((TimestampedItem) item).getTimeStamp() ); + IndexRequest indexRequest = new IndexRequest(index); + indexRequest.id(itemId); if (routingByType.containsKey(itemType)) { - indexBuilder = indexBuilder.setRouting(routingByType.get(itemType)); + indexRequest.routing(routingByType.get(itemType)); } try { if (bulkProcessor == null || !useBatching) { - indexBuilder.execute().actionGet(); + client.index(indexRequest, RequestOptions.DEFAULT); } else { - bulkProcessor.add(indexBuilder.request()); + bulkProcessor.add(indexRequest); } } catch (IndexNotFoundException e) { logger.error("Could not find index {}, could not register item type {} with id {} ", index, itemType, itemId, e); return false; } - */ return true; } catch (IOException e) { throw new Exception("Error saving item " + item, e); @@ -705,20 +685,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) throws Exception { try { String itemType = Item.getItemType(clazz); - - String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName); - - /* TODO implement ES7 + UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), itemId); + updateRequest.doc(source); if (bulkProcessor == null) { - client.prepareUpdate(index, itemType, itemId).setDoc(source) - .execute() - .actionGet(); + client.update(updateRequest, RequestOptions.DEFAULT); } else { - UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setDoc(source).request(); bulkProcessor.add(updateRequest); } - */ return true; } catch (IndexNotFoundException e) { throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); @@ -739,20 +712,22 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); - String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName); + String index = getIndex(itemType, dateHint); for (int i = 0; i < scripts.length; i++) { Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]); - /* TODO implement ES7 - client.indices().prepareRefresh(index).get(); + RefreshRequest refreshRequest = new RefreshRequest(index); + client.indices().refresh(refreshRequest, RequestOptions.DEFAULT); - UpdateByQueryRequestBuilder ubqrb = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); - ubqrb.source(index).source().setTypes(itemType); - BulkByScrollResponse response = ubqrb.setSlices(2) - .setMaxRetries(1000).abortOnVersionConflict(false).script(actualScript) - .filter(conditionESQueryBuilderDispatcher.buildFilter(conditions[i])).get(); + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index); + updateByQueryRequest.setConflicts("proceed"); + updateByQueryRequest.setMaxRetries(1000); + updateByQueryRequest.setSlices(2); + updateByQueryRequest.setScript(actualScript); + updateByQueryRequest.setQuery(conditionESQueryBuilderDispatcher.buildFilter(conditions[i])); + + BulkByScrollResponse response = client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); if (response.getBulkFailures().size() > 0) { for (BulkItemResponse.Failure failure : response.getBulkFailures()) { @@ -770,7 +745,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (response.getNoops() > 0) { logger.warn("Update By Query ended with {} noops!", response.getNoops()); } - */ } return true; } catch (IndexNotFoundException e) { @@ -797,21 +771,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); - String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName); + String index = getIndex(itemType, dateHint); Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams); - /* TODO implement ES7 + UpdateRequest updateRequest = new UpdateRequest(index, itemId); + updateRequest.script(actualScript); if (bulkProcessor == null) { - client.prepareUpdate(index, itemType, itemId).setScript(actualScript) - .execute() - .actionGet(); + client.update(updateRequest, RequestOptions.DEFAULT); } else { - UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request(); bulkProcessor.add(updateRequest); } - */ return true; } catch (IndexNotFoundException e) { @@ -833,10 +803,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); - /* TODO implement ES7 - client.prepareDelete(getIndexNameForQuery(itemType), itemType, itemId) - .execute().actionGet(); - */ + DeleteRequest deleteRequest = new DeleteRequest(getIndexNameForQuery(itemType), itemId); + client.delete(deleteRequest, RequestOptions.DEFAULT); return true; } catch (Exception e) { throw new Exception("Cannot remove", e); @@ -856,16 +824,18 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); - /* TODO implement ES7 - BulkRequestBuilder deleteByScope = client.prepareBulk(); + BulkRequest deleteByScopeBulkRequest = new BulkRequest(); final TimeValue keepAlive = TimeValue.timeValueHours(1); - SearchResponse response = client.prepareSearch(indexName + "*") - .setIndices(getIndexNameForQuery(itemType)) - .setTypes(itemType) - .setScroll(keepAlive) - .setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query)) - .setSize(100).execute().actionGet(); + SearchRequest searchRequest = new SearchRequest(indexName + "*") + .indices(getIndexNameForQuery(itemType)) + .scroll(keepAlive); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .query(conditionESQueryBuilderDispatcher.getQueryBuilder(query)) + .size(100); + searchRequest.source(searchSourceBuilder); + + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); // Scroll until no more hits are returned while (true) { @@ -873,27 +843,31 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (SearchHit hit : response.getHits().getHits()) { // add hit to bulk delete deleteFromCache(hit.getId(), clazz); - deleteByScope.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId())); + deleteByScopeBulkRequest.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId())); } - response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId()); + searchScrollRequest.scroll(keepAlive); + response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT); // If we have no more hits, exit if (response.getHits().getHits().length == 0) { break; } } - client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet(); + + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); // we're done with the scrolling, delete now - if (deleteByScope.numberOfActions() > 0) { - final BulkResponse deleteResponse = deleteByScope.get(); + if (deleteByScopeBulkRequest.numberOfActions() > 0) { + final BulkResponse deleteResponse = client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT); if (deleteResponse.hasFailures()) { // do something logger.debug("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage()); } } - */ return true; } catch (Exception e) { @@ -911,12 +885,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean indexTemplateExists(final String templateName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists") { - protected Boolean execute(Object... args) { - /* TODO implement ES7 - GetIndexTemplatesResponse getIndexTemplatesResponse = client.indices().prepareGetTemplates(templateName).execute().actionGet(); - return getIndexTemplatesResponse.getIndexTemplates().size() == 1; - */ - return true;//TODO remove ES7 + protected Boolean execute(Object... args) throws IOException { + IndexTemplatesExistRequest indexTemplatesExistRequest = new IndexTemplatesExistRequest(templateName); + return client.indices().existsTemplate(indexTemplatesExistRequest, RequestOptions.DEFAULT); } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -928,12 +899,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean removeIndexTemplate(final String templateName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate") { - protected Boolean execute(Object... args) { - /* TODO implement ES7 - AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName), null).actionGet(); + protected Boolean execute(Object... args) throws IOException { + DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(templateName); + AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT); return deleteIndexTemplateResponse.isAcknowledged(); - */ - return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -945,10 +914,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean createMonthlyIndexTemplate() { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate") { - protected Boolean execute(Object... args) { - /* TODO implement ES7 + protected Boolean execute(Object... args) throws IOException { PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("context-monthly-indices") - .template(indexName + "-*") + .patterns(Arrays.asList(indexName + "*" + INDEX_DATE_PREFIX + "*")) .settings("{\n" + " \"index\" : {\n" + " \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" + @@ -964,18 +932,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, " }\n" + " }\n" + "}\n", XContentType.JSON); - Map<String, String> indexMappings = new HashMap<String, String>(); + Map<String, Object> indexMappings = new HashMap<String, Object>(); indexMappings.put("_default_", mappings.get("_default_")); for (Map.Entry<String, String> entry : mappings.entrySet()) { if (itemsMonthlyIndexed.contains(entry.getKey())) { indexMappings.put(entry.getKey(), entry.getValue()); } } - putIndexTemplateRequest.mappings().putAll(indexMappings); + putIndexTemplateRequest.mapping(indexMappings); AcknowledgedResponse putIndexTemplateResponse = client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT); return putIndexTemplateResponse.isAcknowledged(); - */ - return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -986,13 +952,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } public boolean createIndex(final String indexName) { - Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createItem") { - protected Boolean execute(Object... args) { - /* TODO implement ES7 - IndicesExistsResponse indicesExistsResponse = client.indices().prepareExists(indexName).execute().actionGet(); - boolean indexExists = indicesExistsResponse.isExists(); + Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex") { + protected Boolean execute(Object... args) throws IOException { + GetIndexRequest getIndexRequest = new GetIndexRequest(indexName); + boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT); if (!indexExists) { - Map<String, String> indexMappings = new HashMap<String, String>(); + Map<String, Object> indexMappings = new HashMap<String, Object>(); indexMappings.put("_default_", mappings.get("_default_")); for (Map.Entry<String, String> entry : mappings.entrySet()) { if (indexNames.containsKey(entry.getKey()) && indexNames.get(entry.getKey()).equals(indexName)) { @@ -1002,8 +967,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, internalCreateIndex(indexName, indexMappings); } return !indexExists; - */ - return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -1015,16 +978,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean removeIndex(final String indexName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex") { - protected Boolean execute(Object... args) { - /* TODO implement ES7 - IndicesExistsResponse indicesExistsResponse = client.indices().prepareExists(indexName).execute().actionGet(); - boolean indexExists = indicesExistsResponse.isExists(); + protected Boolean execute(Object... args) throws IOException { + GetIndexRequest getIndexRequest = new GetIndexRequest(indexName); + boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT); if (indexExists) { - client.indices().prepareDelete(indexName).execute().actionGet(); + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); + client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); } return indexExists; - */ - return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -1034,10 +995,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - private void internalCreateIndex(String indexName, Map<String, String> mappings) { - /* TODO implement ES7 - CreateIndexRequestBuilder builder = client.indices().prepareCreate(indexName) - .setSettings("{\n" + + private void internalCreateIndex(String indexName, Map<String, Object> mappings) throws IOException { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + createIndexRequest.settings("{\n" + " \"index\" : {\n" + " \"number_of_shards\" : " + numberOfShards + ",\n" + " \"number_of_replicas\" : " + numberOfReplicas + "\n" + @@ -1053,23 +1013,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, " }\n" + "}\n", XContentType.JSON); - for (Map.Entry<String, String> entry : mappings.entrySet()) { - builder.addMapping(entry.getKey(), entry.getValue(), XContentType.JSON); - } - - builder.execute().actionGet(); - */ + createIndexRequest.mapping(mappings); + client.indices().create(createIndexRequest, RequestOptions.DEFAULT); } - private void createMapping(final String type, final String source, final String indexName) { - /* TODO implement ES7 - client.indices() - .preparePutMapping(indexName) - .setType(type) - .setSource(source, XContentType.JSON) - .execute().actionGet(); - */ + private void createMapping(final String type, final String source, final String indexName) throws IOException { + PutMappingRequest putMappingRequest = new PutMappingRequest(indexName); + putMappingRequest.source(source, XContentType.JSON); + client.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT); } @Override @@ -1077,20 +1029,24 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (type.equals("_default_")) { return; } - /* TODO implement ES7 - if (itemsMonthlyIndexed.contains(type)) { - createMonthlyIndexTemplate(); - if (client.indices().prepareExists(indexName + "-*").execute().actionGet().isExists()) { - createMapping(type, source, indexName + "-*"); - } - } else if (indexNames.containsKey(type)) { - if (client.indices().prepareExists(indexNames.get(type)).execute().actionGet().isExists()) { - createMapping(type, source, indexNames.get(type)); + try { + if (itemsMonthlyIndexed.contains(type)) { + createMonthlyIndexTemplate(); + GetIndexRequest getIndexRequest = new GetIndexRequest(indexName + "*" + INDEX_DATE_PREFIX + "*"); + if (client.indices().exists(getIndexRequest, RequestOptions.DEFAULT)) { + createMapping(type, source, indexName + "*" + INDEX_DATE_PREFIX + "*"); + } + } else if (indexNames.containsKey(type)) { + GetIndexRequest getIndexRequest = new GetIndexRequest(indexNames.get(type)); + if (client.indices().exists(getIndexRequest, RequestOptions.DEFAULT)) { + createMapping(type, source, indexNames.get(type)); + } + } else { + createMapping(type, source, indexName); } - } else { - createMapping(type, source, indexName); + } catch (IOException ioe) { + logger.error("Error while creating mapping for type " + type + " and source " + source, ioe); } - */ } @Override @@ -1099,22 +1055,19 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @SuppressWarnings("unchecked") protected Map<String, Map<String, Object>> execute(Object... args) throws Exception { // Get all mapping for current itemType - /* TODO implement ES7 - GetMappingsResponse getMappingsResponse = client.indices().prepareGetMappings().setTypes(itemType).execute().actionGet(); - ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings(); + GetMappingsRequest getMappingsRequest = new GetMappingsRequest(); + getMappingsRequest.indices(getIndex(itemType, null)); + GetMappingsResponse getMappingsResponse = client.indices().getMapping(getMappingsRequest, RequestOptions.DEFAULT); + Map<String, MappingMetaData> mappings = getMappingsResponse.mappings(); // create a list of Keys to get the mappings in chronological order // in case there is monthly context then the mapping will be added from the oldest to the most recent one - Set<String> orderedKeys = new TreeSet<>(Arrays.asList(mappings.keys().toArray(String.class))); - */ + Set<String> orderedKeys = new TreeSet<>(mappings.keySet()); Map<String, Map<String, Object>> result = new HashMap<>(); - /* TODO implement ES7 try { for (String key : orderedKeys) { if (mappings.containsKey(key)) { - ImmutableOpenMap<String, MappingMetaData> next = mappings.get(key); - - Map<String, Map<String, Object>> properties = (Map<String, Map<String, Object>>) next.get(itemType).getSourceAsMap().get("properties"); + Map<String, Map<String, Object>> properties = (Map<String, Map<String, Object>>) mappings.get(key).getSourceAsMap().get("properties"); for (Map.Entry<String, Map<String, Object>> entry : properties.entrySet()) { if (result.containsKey(entry.getKey())) { Map<String, Object> subResult = result.get(entry.getKey()); @@ -1137,7 +1090,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } catch (Throwable t) { throw new Exception("Cannot get mapping for itemType="+ itemType, t); } - */ return result; } }.catchingExecuteInClassLoader(true); @@ -1201,12 +1153,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, //Index the query = register it in the percolator try { logger.info("Saving query : " + queryName); - /* TODO implement ES7 - client.prepareIndex(indexName, ".percolator", queryName) - .setSource(query, XContentType.JSON) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .execute().actionGet(); - */ + String index = getIndex(".percolator", null); + IndexRequest indexRequest = new IndexRequest(index); + indexRequest.id(queryName); + indexRequest.source(query, XContentType.JSON); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.index(indexRequest, RequestOptions.DEFAULT); return true; } catch (Exception e) { throw new Exception("Cannot save query", e); @@ -1235,11 +1187,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) throws Exception { //Index the query = register it in the percolator try { - /* TODO implement ES7 - client.prepareDelete(indexName, ".percolator", queryName) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .execute().actionGet(); - */ + String index = getIndex(".percolator", null); + DeleteRequest deleteRequest = new DeleteRequest(index); + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.delete(deleteRequest, RequestOptions.DEFAULT); return true; } catch (Exception e) { throw new Exception("Cannot delete query", e); @@ -1355,17 +1306,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount") { @Override - protected Long execute(Object... args) { - /* TODO implement ES7 - SearchResponse response = client.prepareSearch(getIndexNameForQuery(itemType)) - .setTypes(itemType) - .setSize(0) - .setQuery(filter) - .execute() - .actionGet(); - return response.getHits().getTotalHits(); - */ - return 0L;//TODO remove ES7 + protected Long execute(Object... args) throws IOException { + SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(filter); + searchSourceBuilder.size(0); + searchRequest.source(searchSourceBuilder); + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); + return response.getHits().getTotalHits().value; } }.catchingExecuteInClassLoader(true); } @@ -1381,47 +1329,27 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); TimeValue keepAlive = TimeValue.timeValueHours(1); - SearchRequestBuilder requestBuilder = null; + SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .fetchSource(true) + .query(query) + .size(size) + .from(offset); if (scrollTimeValidity != null) { keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueHours(1), "scrollTimeValidity"); - /* TODO implement ES7 - requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) - .setTypes(itemType) - .setFetchSource(true) - .setScroll(keepAlive) - .setFrom(offset) - .setQuery(query) - .setSize(size); - */ - } else { - /* TODO implement ES7 - requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) - .setTypes(itemType) - .setFetchSource(true) - .setQuery(query) - .setFrom(offset); - - */ + searchRequest.scroll(keepAlive); } if (size == Integer.MIN_VALUE) { - requestBuilder.setSize(defaultQueryLimit); + searchSourceBuilder.size(defaultQueryLimit); } else if (size != -1) { - requestBuilder.setSize(size); + searchSourceBuilder.size(size); } else { // size == -1, use scroll query to retrieve all the results - /* TODO implement ES7 - requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) - .setTypes(itemType) - .setFetchSource(true) - .setScroll(keepAlive) - .setFrom(offset) - .setQuery(query) - .setSize(100); - */ + searchRequest.scroll(keepAlive); } if (routing != null) { - requestBuilder.setRouting(routing); + searchRequest.routing(routing); } if (sortBy != null) { String[] sortByArray = sortBy.split(","); @@ -1430,17 +1358,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String[] elements = sortByElement.split(":"); GeoDistanceSortBuilder distanceSortBuilder = SortBuilders.geoDistanceSort(elements[1], Double.parseDouble(elements[2]), Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS); if (elements.length > 4 && elements[4].equals("desc")) { - requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.DESC)); + searchSourceBuilder.sort(distanceSortBuilder.order(SortOrder.DESC)); } else { - requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.ASC)); + searchSourceBuilder.sort(distanceSortBuilder.order(SortOrder.ASC)); } } else { String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement, ":"), itemType); if (name != null) { if (sortByElement.endsWith(":desc")) { - requestBuilder = requestBuilder.addSort(name, SortOrder.DESC); + searchSourceBuilder.sort(name, SortOrder.DESC); } else { - requestBuilder = requestBuilder.addSort(name, SortOrder.ASC); + searchSourceBuilder.sort(name, SortOrder.ASC); } } else { // in the case of no data existing for the property, we will not add the sorting to the request. @@ -1449,10 +1377,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } - SearchResponse response = requestBuilder - .setVersion(true) - .execute() - .actionGet(); + searchSourceBuilder.version(true); + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); if (size == -1) { // Scroll until no more hits are returned while (true) { @@ -1466,17 +1392,18 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, results.add(value); } - /* TODO implement ES7 - response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); - */ + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId()); + searchScrollRequest.scroll(keepAlive); + response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT); + // If we have no more hits, exit if (response.getHits().getHits().length == 0) { break; } } - /* TODO implement ES7 - client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet(); - */ + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); } else { SearchHits searchHits = response.getHits(); scrollIdentifier = response.getScrollId(); @@ -1513,11 +1440,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, long totalHits = 0; try { TimeValue keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueMinutes(10), "scrollTimeValidity"); - /* TODO implement ES7 - SearchResponse response = client.prepareSearchScroll(scrollIdentifier).setScroll(keepAlive).execute().actionGet(); + + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollIdentifier); + searchScrollRequest.scroll(keepAlive); + SearchResponse response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT); if (response.getHits().getHits().length == 0) { - client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet(); + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); } else { for (SearchHit searchHit : response.getHits().getHits()) { // add hit to results @@ -1528,9 +1459,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, results.add(value); } } - PartialList<T> result = new PartialList<T>(results, 0, response.getHits().getHits().length, response.getHits().getTotalHits()); - */ - PartialList<T> result = null;//TODO remove ES7 + PartialList<T> result = new PartialList<T>(results, 0, response.getHits().getHits().length, response.getHits().getTotalHits().value); if (scrollIdentifier != null) { result.setScrollIdentifier(scrollIdentifier); result.setScrollTimeValidity(scrollTimeValidity); @@ -1562,14 +1491,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery") { @Override - protected Map<String, Long> execute(Object... args) { + protected Map<String, Long> execute(Object... args) throws IOException { Map<String, Long> results = new LinkedHashMap<String, Long>(); - /* TODO implement ES7 - SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType)) - .setTypes(itemType) - .setSize(0) - .setQuery(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(0); + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>(); if (aggregate != null) { @@ -1577,7 +1505,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String fieldName = aggregate.getField(); if (aggregate instanceof DateAggregate) { DateAggregate dateAggregate = (DateAggregate) aggregate; - DateHistogramAggregationBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(fieldName).dateHistogramInterval(new DateHistogramInterval((dateAggregate.getInterval()))); + DateHistogramAggregationBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(fieldName).calendarInterval(new DateHistogramInterval((dateAggregate.getInterval()))); if (dateAggregate.getFormat() != null) { dateHistogramBuilder.format(dateAggregate.getFormat()); } @@ -1646,11 +1574,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // filter on range items in the query block so we don't retrieve all the document before filtering the whole if (optimizedQuery) { for (AggregationBuilder aggregationBuilder : lastAggregation) { - builder.addAggregation(aggregationBuilder); + searchSourceBuilder.aggregation(aggregationBuilder); } if (filter != null) { - builder.setQuery(conditionESQueryBuilderDispatcher.buildFilter(filter)); + searchSourceBuilder.query(conditionESQueryBuilderDispatcher.buildFilter(filter)); } } else { if (filter != null) { @@ -1666,15 +1594,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, globalAggregation.subAggregation(aggregationBuilder); } - builder.addAggregation(globalAggregation); + searchSourceBuilder.aggregation(globalAggregation); } - SearchResponse response = builder.execute().actionGet(); + searchRequest.source(searchSourceBuilder); + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = response.getAggregations(); if (aggregations != null) { if (optimizedQuery) { if (response.getHits() != null) { - results.put("_filtered", response.getHits().getTotalHits()); + results.put("_filtered", response.getHits().getTotalHits().value); } } else { Global globalAgg = aggregations.get("global"); @@ -1698,7 +1627,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } - */ return results; } }.catchingExecuteInClassLoader(true); @@ -1738,27 +1666,19 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate") { @Override protected Object execute(Object... args) throws Exception { - /* TODO implement ES7 - IndicesStatsResponse statsResponse = client.indices().prepareStats(indexName + "-*") - .setIndexing(false) - .setGet(false) - .setSearch(false) - .setWarmer(false) - .setMerge(false) - .setFieldData(false) - .setFlush(false) - .setCompletion(false) - .setRefresh(false) - .execute() - .actionGet(); + + GetIndexRequest getIndexRequest = new GetIndexRequest("*"); + GetIndexResponse getIndexResponse = client.indices().get(getIndexRequest, RequestOptions.DEFAULT); + String[] indices = getIndexResponse.getIndices(); SimpleDateFormat d = new SimpleDateFormat("yyyy-MM"); List<String> toDelete = new ArrayList<String>(); - for (String currentIndexName : statsResponse.getIndices().keySet()) { - if (currentIndexName.startsWith(indexName + "-")) { + for (String currentIndexName : indices) { + int indexDatePrefixPos = currentIndexName.indexOf(INDEX_DATE_PREFIX); + if (indexDatePrefixPos > -1) { try { - Date indexDate = d.parse(currentIndexName.substring(indexName.length() + 1)); + Date indexDate = d.parse(currentIndexName.substring(indexDatePrefixPos + INDEX_DATE_PREFIX.length())); if (indexDate.before(date)) { toDelete.add(currentIndexName); @@ -1769,9 +1689,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } if (!toDelete.isEmpty()) { - client.admin().indices().prepareDelete(toDelete.toArray(new String[toDelete.size()])).execute().actionGet(); + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(toDelete.toArray(new String[toDelete.size()])); + client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); } - */ return null; } }.catchingExecuteInClassLoader(true); @@ -1781,42 +1701,49 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public void purge(final String scope) { new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope") { @Override - protected Void execute(Object... args) { + protected Void execute(Object... args) throws IOException { QueryBuilder query = termQuery("scope", scope); - /* TODO implement ES7 - BulkRequestBuilder deleteByScope = client.prepareBulk(); + + BulkRequest deleteByScopeBulkRequest = new BulkRequest(); final TimeValue keepAlive = TimeValue.timeValueHours(1); - SearchResponse response = client.prepareSearch(indexName + "*") - .setScroll(keepAlive) - .setQuery(query) - .setSize(100).execute().actionGet(); + SearchRequest searchRequest = new SearchRequest(indexName + "*").scroll(keepAlive); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .query(query) + .size(100); + searchRequest.source(searchSourceBuilder); + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); // Scroll until no more hits are returned while (true) { for (SearchHit hit : response.getHits().getHits()) { // add hit to bulk delete - deleteByScope.add(Requests.deleteRequest(hit.index()).type(hit.type()).id(hit.id())); + DeleteRequest deleteRequest = new DeleteRequest(hit.getIndex(), hit.getId()); + deleteByScopeBulkRequest.add(deleteRequest); } - response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId()); + searchScrollRequest.scroll(keepAlive); + response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT); // If we have no more hits, exit if (response.getHits().getHits().length == 0) { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); break; } } // we're done with the scrolling, delete now - if (deleteByScope.numberOfActions() > 0) { - final BulkResponse deleteResponse = deleteByScope.get(); + if (deleteByScopeBulkRequest.numberOfActions() > 0) { + final BulkResponse deleteResponse = client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT); if (deleteResponse.hasFailures()) { // do something - logger.debug("Couldn't delete from scope " + scope + ":\n{}", deleteResponse.buildFailureMessage()); + logger.warn("Couldn't delete from scope " + scope + ":\n{}", deleteResponse.buildFailureMessage()); } } - */ return null; } }.catchingExecuteInClassLoader(true); @@ -1827,14 +1754,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics") { @Override - protected Map<String, Double> execute(Object... args) { + protected Map<String, Double> execute(Object... args) throws IOException { Map<String, Double> results = new LinkedHashMap<String, Double>(); - /* TODO implement ES7 - SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType)) - .setTypes(itemType) - .setSize(0) - .setQuery(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .size(0) + .query(QueryBuilders.matchAllQuery()); AggregationBuilder filterAggregation = AggregationBuilders.filter("metrics", conditionESQueryBuilderDispatcher.buildFilter(condition)); if (metrics != null) { @@ -1861,8 +1787,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } - builder.addAggregation(filterAggregation); - SearchResponse response = builder.execute().actionGet(); + searchSourceBuilder.aggregation(filterAggregation); + searchRequest.source(searchSourceBuilder); + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = response.getAggregations(); if (aggregations != null) { @@ -1875,7 +1802,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } - */ return results; } }.catchingExecuteInClassLoader(true); @@ -1883,7 +1809,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String getIndexNameForQuery(String itemType) { return indexNames.containsKey(itemType) ? indexNames.get(itemType) : - (itemsMonthlyIndexed.contains(itemType) ? indexName + "-*" : indexName); + (itemsMonthlyIndexed.contains(itemType) ? indexName + "*" + INDEX_DATE_PREFIX + "*" : indexName); } private String getConfig(Map<String, String> settings, String key, @@ -1970,4 +1896,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return itemCache.remove(itemId); } + private String getIndex(String itemType, Date dateHint) { + String indexItemTypePart = indexNames.containsKey(itemType) ? indexNames.get(itemType) : + (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? itemType + "-" + getMonthlyIndexPart(dateHint) : itemType); + + return indexName + "-" + indexItemTypePart; + } + + private String getMonthlyIndexPart(Date date) { + String d = new SimpleDateFormat("-yyyy-MM").format(date); + return INDEX_DATE_PREFIX + d; + } + } diff --git a/persistence-elasticsearch/core/src/test/java/org/apache/unomi/persistence/elasticsearch/ElasticsearchPersistenceTest.java b/persistence-elasticsearch/core/src/test/java/org/apache/unomi/persistence/elasticsearch/ElasticsearchPersistenceTest.java index 32e1653..192bcb1 100644 --- a/persistence-elasticsearch/core/src/test/java/org/apache/unomi/persistence/elasticsearch/ElasticsearchPersistenceTest.java +++ b/persistence-elasticsearch/core/src/test/java/org/apache/unomi/persistence/elasticsearch/ElasticsearchPersistenceTest.java @@ -17,10 +17,6 @@ package org.apache.unomi.persistence.elasticsearch; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; -import java.util.Arrays; -import java.util.Collection; -import java.util.UUID; -import java.util.logging.Logger; import org.apache.http.HttpHost; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.index.IndexRequest; @@ -47,6 +43,11 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; +import java.util.logging.Logger; + @RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class) @ThreadLeakScope(value = ThreadLeakScope.Scope.NONE) public class ElasticsearchPersistenceTest { @@ -56,10 +57,10 @@ public class ElasticsearchPersistenceTest { private static final String CLUSTER_NAME = "unomi-cluster-test"; private static final String NODE_NAME = "unomi-node-test"; private static final String HOST = "127.0.0.1"; - private static final int HTTP_PORT_NODE_1 = 9200; - private static final int HTTP_PORT_NODE_2 = 9201; - private static final int TRANSPORT_PORT_NODE_1 = 9300; - private static final int TRANSPORT_PORT_NODE_2 = 9301; + private static final int HTTP_PORT_NODE_1 = 9200+10; + private static final int HTTP_PORT_NODE_2 = 9201+10; + private static final int TRANSPORT_PORT_NODE_1 = 9300+10; + private static final int TRANSPORT_PORT_NODE_2 = 9301+10; private static RestHighLevelClient restHighLevelClient; diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java index 27e5ced..a2096a0 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java @@ -17,15 +17,6 @@ package org.apache.unomi.plugins.baseplugin.conditions; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; import ognl.Node; import ognl.Ognl; import ognl.OgnlContext; @@ -33,11 +24,7 @@ import ognl.OgnlException; import ognl.enhance.ExpressionAccessor; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.unomi.api.CustomItem; -import org.apache.unomi.api.Event; -import org.apache.unomi.api.Item; -import org.apache.unomi.api.Profile; -import org.apache.unomi.api.Session; +import org.apache.unomi.api.*; import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper; import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluator; @@ -46,6 +33,10 @@ import org.apache.unomi.persistence.spi.PropertyHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.regex.Pattern; + /** * Evaluator for property comparison conditions */ @@ -390,10 +381,10 @@ public class PropertyConditionEvaluator implements ConditionEvaluator { if (value == null) { return null; } - /* TODO implement ES7 if (value instanceof Date) { return ((Date) value); } else { + /* ES7 DateMathParser parser = new DateMathParser(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER); try { return new Date(parser.parse(value.toString(), new LongSupplier() { @@ -404,8 +395,8 @@ public class PropertyConditionEvaluator implements ConditionEvaluator { } catch (ElasticsearchParseException e) { logger.warn("unable to parse date " + value.toString(), e); } + */ } - */ return null; } diff --git a/plugins/request/pom.xml b/plugins/request/pom.xml index e2c24c0..516e787 100644 --- a/plugins/request/pom.xml +++ b/plugins/request/pom.xml @@ -172,16 +172,20 @@ <instructions> <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency> <Import-Package> + COM.newmonics.PercClassLoader;resolution:=optional, + android.os;resolution:=optional, org.apache.avalon.framework.logger;resolution:=optional, org.apache.log;resolution:=optional, org.apache.log4j;resolution:=optional, sun.misc;resolution:=optional, sun.nio.ch;resolution:=optional, + jrockit.vm;resolution:=optional, kotlin.reflect;resolution:=optional, kotlin.reflect.jvm;resolution:=optional, nl.basjes.shaded.com.google.errorprone.annotations;resolution:=optional, nl.basjes.shaded.com.google.errorprone.annotations.concurrent;resolution:=optional, org.checkerframework.checker.nullness.qual;resolution:=optional, + sun.reflect;resolution:=optional, * </Import-Package> </instructions> diff --git a/pom.xml b/pom.xml index 8ab1be9..9dd43d3 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ <version.karaf>4.1.7</version.karaf> <version.karaf.cellar>4.1.0</version.karaf.cellar> <version.pax.exam>4.11.0</version.pax.exam> - <elasticsearch.version>7.4.0</elasticsearch.version> + <elasticsearch.version>7.4.2</elasticsearch.version> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> @@ -1010,7 +1010,7 @@ <plugin> <groupId>org.apache.felix</groupId> <artifactId>maven-bundle-plugin</artifactId> - <version>3.2.0</version> + <version>4.2.1</version> <extensions>true</extensions> <configuration> <instructions>