This is an automated email from the ASF dual-hosted git repository. shuber pushed a commit to branch UNOMI-225-ES7 in repository https://gitbox.apache.org/repos/asf/unomi.git
commit a605e2731447e63341ba1ef1b1b9c8fc26360a64 Author: Francois Papon <[email protected]> AuthorDate: Thu Oct 17 11:34:48 2019 +0200 WIP [UNOMI-225] --- persistence-elasticsearch/core/pom.xml | 185 +++++++++------- .../ElasticSearchPersistenceServiceImpl.java | 232 +++++++++++++-------- .../ElasticsearchPersistenceTest.java | 172 +++++++++++++++ .../conditions/PropertyConditionEvaluator.java | 25 ++- pom.xml | 2 +- 5 files changed, 442 insertions(+), 174 deletions(-) diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index 18f18f6..6f43269 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -62,7 +62,43 @@ <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> - <version>7.0.0-beta1</version> + <version>${elasticsearch.version}</version> + </dependency> + <dependency> + <groupId>org.elasticsearch.test</groupId> + <artifactId>framework</artifactId> + <version>${elasticsearch.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch.plugin</groupId> + <artifactId>transport-netty4-client</artifactId> + <version>${elasticsearch.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-test-framework</artifactId> + <version>8.2.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.12.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.12.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <version>2.12.1</version> + <scope>test</scope> </dependency> <!-- The following are optional dependencies from the ElasticSearch that are made mandatory --> @@ -142,7 +178,6 @@ <artifactId>commons-io</artifactId> <scope>test</scope> </dependency> - <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast-all</artifactId> @@ -161,75 +196,69 @@ <configuration> <instructions> <Import-Package> - android.app;resolution:=optional, - com.carrotsearch.randomizedtesting;resolution:=optional, - com.google.appengine.api;resolution:=optional, - com.google.apphosting.api;resolution:=optional, - com.google.protobuf;resolution:=optional, - com.twitter.util;resolution:=optional, - com.vividsolutions.jts.*;resolution:=optional, - javax.annotation;resolution:=optional, - javax.crypto;resolution:=optional, - javax.crypto.spec;resolution:=optional, - javax.net.ssl;resolution:=optional, - javax.security.auth.x500;resolution:=optional, - javax.security.cert;resolution:=optional, - javax.servlet;resolution:=optional, - javax.servlet.http;resolution:=optional, - javax.xml.bind;resolution:=optional, - javax.xml.parsers;resolution:=optional, - javax.xml.transform;resolution:=optional, - javax.xml.transform.dom;resolution:=optional, - javax.xml.transform.stream;resolution:=optional, - org.apache.commons.logging;resolution:=optional, - org.apache.log;resolution:=optional, - org.apache.regexp;resolution:=optional, - org.apache.tomcat.jni;resolution:=optional, - org.bouncycastle.*;resolution:=optional, - org.eclipse.jetty.npn;resolution:=optional, - org.jboss.logging;resolution:=optional, - org.jboss.marshalling;resolution:=optional, - org.jruby;resolution:=optional, - org.jruby.embed;resolution:=optional, - sun.misc;resolution:=optional, - sun.security.util;resolution:=optional, - sun.security.x509;resolution:=optional, - com.google.protobuf.nano;resolution:=optional, - com.jcraft.jzlib;resolution:=optional, - com.ning.compress;resolution:=optional, - com.ning.compress.lzf;resolution:=optional, - com.ning.compress.lzf.util;resolution:=optional, - com.sun.management;resolution:=optional, - javassist;resolution:=optional, - lzma.sdk;resolution:=optional, - lzma.sdk.lzma;resolution:=optional, - net.jpountz.lz4;resolution:=optional, - net.jpountz.xxhash;resolution:=optional, - org.apache.tomcat;resolution:=optional, - org.eclipse.jetty.alpn;resolution:=optional, - org.joda.convert;resolution:=optional, - org.locationtech.spatial4j.context;resolution:=optional, - org.locationtech.spatial4j.context.jts;resolution:=optional, - org.locationtech.spatial4j.distance;resolution:=optional, - org.locationtech.spatial4j.exception;resolution:=optional, - org.locationtech.spatial4j.io;resolution:=optional, - org.locationtech.spatial4j.shape;resolution:=optional, - org.locationtech.spatial4j.shape.impl;resolution:=optional, - org.locationtech.spatial4j.shape.jts;resolution:=optional, - org.zeromq;resolution:=optional, - org.apache.commons.compress.compressors;resolution:=optional, - org.apache.commons.compress.utils;resolution:=optional, - org.apache.commons.csv;resolution:=optional, - org.apache.kafka.clients.producer;resolution:=optional, - javax.persistence;resolution:=optional, - com.google.errorprone.annotations.concurrent;resolution:=optional, - com.lmax.disruptor;resolution:=optional, - com.lmax.disruptor.dsl;resolution:=optional, - com.fasterxml.jackson.dataformat.xml;resolution:=optional, - com.fasterxml.jackson.dataformat.xml.annotation;resolution:=optional, - com.fasterxml.jackson.dataformat.xml.util;resolution:=optional, - io.netty.internal.tcnative;resolution:=optional, - org.conscrypt;resolution:=optional, +<!-- com.google.protobuf;resolution:=optional,--> +<!-- com.twitter.util;resolution:=optional,--> +<!-- com.vividsolutions.jts.*;resolution:=optional,--> +<!-- javax.annotation;resolution:=optional,--> +<!-- javax.crypto;resolution:=optional,--> +<!-- javax.crypto.spec;resolution:=optional,--> +<!-- javax.net.ssl;resolution:=optional,--> +<!-- javax.security.auth.x500;resolution:=optional,--> +<!-- javax.security.cert;resolution:=optional,--> +<!-- javax.servlet;resolution:=optional,--> +<!-- javax.servlet.http;resolution:=optional,--> +<!-- javax.xml.bind;resolution:=optional,--> +<!-- javax.xml.parsers;resolution:=optional,--> +<!-- javax.xml.transform;resolution:=optional,--> +<!-- javax.xml.transform.dom;resolution:=optional,--> +<!-- javax.xml.transform.stream;resolution:=optional,--> +<!-- org.apache.commons.logging;resolution:=optional,--> +<!-- org.apache.regexp;resolution:=optional,--> +<!-- org.apache.tomcat.jni;resolution:=optional,--> +<!-- org.bouncycastle.*;resolution:=optional,--> +<!-- org.eclipse.jetty.npn;resolution:=optional,--> +<!-- org.jboss.logging;resolution:=optional,--> +<!-- org.jboss.marshalling;resolution:=optional,--> +<!-- org.jruby;resolution:=optional,--> +<!-- org.jruby.embed;resolution:=optional,--> +<!-- sun.misc;resolution:=optional,--> +<!-- sun.security.util;resolution:=optional,--> +<!-- sun.security.x509;resolution:=optional,--> +<!-- com.google.protobuf.nano;resolution:=optional,--> +<!-- com.jcraft.jzlib;resolution:=optional,--> +<!-- com.ning.compress;resolution:=optional,--> +<!-- com.ning.compress.lzf;resolution:=optional,--> +<!-- com.ning.compress.lzf.util;resolution:=optional,--> +<!-- javassist;resolution:=optional,--> +<!-- lzma.sdk;resolution:=optional,--> +<!-- lzma.sdk.lzma;resolution:=optional,--> +<!-- net.jpountz.lz4;resolution:=optional,--> +<!-- net.jpountz.xxhash;resolution:=optional,--> +<!-- org.apache.tomcat;resolution:=optional,--> +<!-- org.eclipse.jetty.alpn;resolution:=optional,--> +<!-- org.joda.convert;resolution:=optional,--> +<!-- org.locationtech.spatial4j.context;resolution:=optional,--> +<!-- org.locationtech.spatial4j.context.jts;resolution:=optional,--> +<!-- org.locationtech.spatial4j.distance;resolution:=optional,--> +<!-- org.locationtech.spatial4j.exception;resolution:=optional,--> +<!-- org.locationtech.spatial4j.io;resolution:=optional,--> +<!-- org.locationtech.spatial4j.shape;resolution:=optional,--> +<!-- org.locationtech.spatial4j.shape.impl;resolution:=optional,--> +<!-- org.locationtech.spatial4j.shape.jts;resolution:=optional,--> +<!-- org.zeromq;resolution:=optional,--> +<!-- org.apache.commons.compress.compressors;resolution:=optional,--> +<!-- org.apache.commons.compress.utils;resolution:=optional,--> +<!-- org.apache.commons.csv;resolution:=optional,--> +<!-- org.apache.kafka.clients.producer;resolution:=optional,--> +<!-- javax.persistence;resolution:=optional,--> +<!-- com.google.errorprone.annotations.concurrent;resolution:=optional,--> +<!-- com.lmax.disruptor;resolution:=optional,--> +<!-- com.lmax.disruptor.dsl;resolution:=optional,--> +<!-- com.fasterxml.jackson.dataformat.xml;resolution:=optional,--> +<!-- com.fasterxml.jackson.dataformat.xml.annotation;resolution:=optional,--> +<!-- com.fasterxml.jackson.dataformat.xml.util;resolution:=optional,--> +<!-- io.netty.internal.tcnative;resolution:=optional,--> +<!-- org.conscrypt;resolution:=optional,--> org.apache.unomi.api, org.apache.unomi.api.rules, org.apache.unomi.api.actions, @@ -242,13 +271,13 @@ org.apache.unomi.api.segments, * </Import-Package> - <Export-Package> - org.elasticsearch.*;version="${elasticsearch.version}", - org.elasticsearch.index.query.*;version="${elasticsearch.version}", - org.apache.unomi.persistence.elasticsearch.conditions;version="${project.version}" - </Export-Package> - <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency> - <Embed-Transitive>true</Embed-Transitive> +<!-- <Export-Package>--> +<!-- org.elasticsearch.*;version="${elasticsearch.version}",--> +<!-- org.elasticsearch.index.query.*;version="${elasticsearch.version}",--> +<!-- org.apache.unomi.persistence.elasticsearch.conditions;version="${project.version}"--> +<!-- </Export-Package>--> +<!-- <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>--> +<!-- <Embed-Transitive>true</Embed-Transitive>--> </instructions> </configuration> </plugin> diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index c579abf..8c0a500 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -17,95 +17,82 @@ package org.apache.unomi.persistence.elasticsearch; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; + import com.hazelcast.core.HazelcastInstance; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.unomi.api.Item; import org.apache.unomi.api.PartialList; import org.apache.unomi.api.TimestampedItem; import org.apache.unomi.api.conditions.Condition; -import org.apache.unomi.api.query.DateRange; -import org.apache.unomi.api.query.IpRange; -import org.apache.unomi.api.query.NumericRange; import org.apache.unomi.metrics.MetricAdapter; import org.apache.unomi.metrics.MetricsService; -import org.apache.unomi.persistence.elasticsearch.conditions.*; +import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper; +import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder; +import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher; +import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluator; +import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluatorDispatcher; import org.apache.unomi.persistence.spi.PersistenceService; -import org.apache.unomi.persistence.spi.aggregate.*; +import org.apache.unomi.persistence.spi.aggregate.BaseAggregate; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; -import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; -import org.elasticsearch.action.bulk.*; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.*; -import org.elasticsearch.client.core.AcknowledgedResponse; +import org.elasticsearch.client.Node; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.core.MainResponse; import org.elasticsearch.client.indices.GetIndexRequest; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.UpdateByQueryAction; -import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptException; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.aggregations.*; -import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; -import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; -import org.elasticsearch.search.aggregations.bucket.filter.Filter; -import org.elasticsearch.search.aggregations.bucket.global.Global; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.range.IpRangeAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; -import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; -import org.osgi.framework.*; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleEvent; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.SynchronousBundleListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URL; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.index.query.QueryBuilders.termQuery; - @SuppressWarnings("rawtypes") public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener { @@ -146,8 +133,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String bulkProcessorFlushInterval = "5s"; private String bulkProcessorBackoffPolicy = "exponential"; - private String minimalElasticSearchVersion = "5.0.0"; - private String maximalElasticSearchVersion = "5.7.0"; + private String minimalElasticSearchVersion = "7.0.0"; + private String maximalElasticSearchVersion = "7.4.0"; private int aggregateQueryBucketSize = 5000; @@ -317,25 +304,25 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, RestClient.builder(nodeList.toArray(new Node[nodeList.size()]))); // let's now check the versions of all the nodes in the cluster, to make sure they are as expected. - try { - NodesInfoResponse nodesInfoResponse = client.cluster().prepareNodesInfo() - .all().execute().get(); - - org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion); - org.elasticsearch.Version maximalVersion = org.elasticsearch.Version.fromString(maximalElasticSearchVersion); - for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { - org.elasticsearch.Version version = nodeInfo.getVersion(); - if (version.before(minimalVersion) || - version.equals(maximalVersion) || - version.after(maximalVersion)) { - throw new Exception("ElasticSearch version on node " + nodeInfo.getHostname() + " is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !"); - } - } - } catch (InterruptedException e) { - throw new Exception("Error checking ElasticSearch versions", e); - } catch (ExecutionException e) { - throw new Exception("Error checking ElasticSearch versions", e); - } + MainResponse response = client.info(RequestOptions.DEFAULT); + org.elasticsearch.client.core.MainResponse.Version version = response.getVersion(); + //TODO change check of version prerequisite +// try { +// org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion); +// org.elasticsearch.Version maximalVersion = org.elasticsearch.Version.fromString(maximalElasticSearchVersion); +// for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { +// org.elasticsearch.Version version = nodeInfo.getVersion(); +// if (version.before(minimalVersion) || +// version.equals(maximalVersion) || +// version.after(maximalVersion)) { +// throw new Exception("ElasticSearch version on node " + nodeInfo.getHostname() + " is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !"); +// } +// } +// } catch (InterruptedException e) { +// throw new Exception("Error checking ElasticSearch versions", e); +// } catch (ExecutionException e) { +// throw new Exception("Error checking ElasticSearch versions", e); +// } loadPredefinedMappings(bundleContext, false); @@ -484,7 +471,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public void stop() { new InClassLoaderExecute<Object>(null, null) { - protected Object execute(Object... args) { + protected Object execute(Object... args) throws IOException { logger.info("Closing ElasticSearch persistence backend..."); if (bulkProcessor != null) { try { @@ -626,6 +613,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(dateHint) : indexName); + SearchRequest searchRequest = new SearchRequest(index); + searchRequest.searchType(itemType); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.idsQuery().addIds(itemId)); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + /* TODO implement ES7 GetResponse response = client.prepareGet(index, itemType, itemId) .execute() .actionGet(); @@ -639,6 +632,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } else { return null; } + */ } } catch (IndexNotFoundException e) { // this can happen if we are just testing the existence of the item, it is not always an error. @@ -646,6 +640,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } catch (Exception ex) { throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, ex); } + return null;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); @@ -667,6 +662,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, putInCache(itemId, item); String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(((TimestampedItem) item).getTimeStamp()) : indexName); + /* TODO implement ES7 IndexRequestBuilder indexBuilder = client.prepareIndex(index, itemType, itemId) .setSource(source, XContentType.JSON); if (routingByType.containsKey(itemType)) { @@ -684,6 +680,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, index, itemType, itemId, e); return false; } + */ return true; } catch (IOException e) { throw new Exception("Error saving item " + item, e); @@ -712,6 +709,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName); + /* TODO implement ES7 if (bulkProcessor == null) { client.prepareUpdate(index, itemType, itemId).setDoc(source) .execute() @@ -720,6 +718,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setDoc(source).request(); bulkProcessor.add(updateRequest); } + */ return true; } catch (IndexNotFoundException e) { throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); @@ -746,6 +745,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (int i = 0; i < scripts.length; i++) { Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]); + /* TODO implement ES7 client.indices().prepareRefresh(index).get(); UpdateByQueryRequestBuilder ubqrb = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); @@ -753,6 +753,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, BulkByScrollResponse response = ubqrb.setSlices(2) .setMaxRetries(1000).abortOnVersionConflict(false).script(actualScript) .filter(conditionESQueryBuilderDispatcher.buildFilter(conditions[i])).get(); + if (response.getBulkFailures().size() > 0) { for (BulkItemResponse.Failure failure : response.getBulkFailures()) { logger.error("Failure : cause={} , message={}", failure.getCause(), failure.getMessage()); @@ -769,6 +770,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (response.getNoops() > 0) { logger.warn("Update By Query ended with {} noops!", response.getNoops()); } + */ } return true; } catch (IndexNotFoundException e) { @@ -800,6 +802,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams); + /* TODO implement ES7 if (bulkProcessor == null) { client.prepareUpdate(index, itemType, itemId).setScript(actualScript) .execute() @@ -808,6 +811,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request(); bulkProcessor.add(updateRequest); } + */ + return true; } catch (IndexNotFoundException e) { throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); @@ -828,8 +833,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); + /* TODO implement ES7 client.prepareDelete(getIndexNameForQuery(itemType), itemType, itemId) .execute().actionGet(); + */ return true; } catch (Exception e) { throw new Exception("Cannot remove", e); @@ -849,6 +856,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = Item.getItemType(clazz); + /* TODO implement ES7 BulkRequestBuilder deleteByScope = client.prepareBulk(); final TimeValue keepAlive = TimeValue.timeValueHours(1); @@ -885,6 +893,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.debug("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage()); } } + */ return true; } catch (Exception e) { @@ -903,8 +912,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean indexTemplateExists(final String templateName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists") { protected Boolean execute(Object... args) { + /* TODO implement ES7 GetIndexTemplatesResponse getIndexTemplatesResponse = client.indices().prepareGetTemplates(templateName).execute().actionGet(); return getIndexTemplatesResponse.getIndexTemplates().size() == 1; + */ + return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -917,8 +929,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean removeIndexTemplate(final String templateName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate") { protected Boolean execute(Object... args) { - AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName)).actionGet(); + /* TODO implement ES7 + AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName), null).actionGet(); return deleteIndexTemplateResponse.isAcknowledged(); + */ + return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -931,6 +946,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean createMonthlyIndexTemplate() { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate") { protected Boolean execute(Object... args) { + /* TODO implement ES7 PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("context-monthly-indices") .template(indexName + "-*") .settings("{\n" + @@ -958,6 +974,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, putIndexTemplateRequest.mappings().putAll(indexMappings); AcknowledgedResponse putIndexTemplateResponse = client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT); return putIndexTemplateResponse.isAcknowledged(); + */ + return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -970,6 +988,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean createIndex(final String indexName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createItem") { protected Boolean execute(Object... args) { + /* TODO implement ES7 IndicesExistsResponse indicesExistsResponse = client.indices().prepareExists(indexName).execute().actionGet(); boolean indexExists = indicesExistsResponse.isExists(); if (!indexExists) { @@ -983,6 +1002,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, internalCreateIndex(indexName, indexMappings); } return !indexExists; + */ + return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -995,12 +1016,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean removeIndex(final String indexName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex") { protected Boolean execute(Object... args) { + /* TODO implement ES7 IndicesExistsResponse indicesExistsResponse = client.indices().prepareExists(indexName).execute().actionGet(); boolean indexExists = indicesExistsResponse.isExists(); if (indexExists) { client.indices().prepareDelete(indexName).execute().actionGet(); } return indexExists; + */ + return true;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); if (result == null) { @@ -1011,6 +1035,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } private void internalCreateIndex(String indexName, Map<String, String> mappings) { + /* TODO implement ES7 CreateIndexRequestBuilder builder = client.indices().prepareCreate(indexName) .setSettings("{\n" + " \"index\" : {\n" + @@ -1033,16 +1058,18 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } builder.execute().actionGet(); - + */ } private void createMapping(final String type, final String source, final String indexName) { + /* TODO implement ES7 client.indices() .preparePutMapping(indexName) .setType(type) .setSource(source, XContentType.JSON) .execute().actionGet(); + */ } @Override @@ -1050,6 +1077,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (type.equals("_default_")) { return; } + /* TODO implement ES7 if (itemsMonthlyIndexed.contains(type)) { createMonthlyIndexTemplate(); if (client.indices().prepareExists(indexName + "-*").execute().actionGet().isExists()) { @@ -1062,6 +1090,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } else { createMapping(type, source, indexName); } + */ } @Override @@ -1070,14 +1099,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @SuppressWarnings("unchecked") protected Map<String, Map<String, Object>> execute(Object... args) throws Exception { // Get all mapping for current itemType + /* TODO implement ES7 GetMappingsResponse getMappingsResponse = client.indices().prepareGetMappings().setTypes(itemType).execute().actionGet(); ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings(); // create a list of Keys to get the mappings in chronological order // in case there is monthly context then the mapping will be added from the oldest to the most recent one Set<String> orderedKeys = new TreeSet<>(Arrays.asList(mappings.keys().toArray(String.class))); - + */ Map<String, Map<String, Object>> result = new HashMap<>(); + /* TODO implement ES7 try { for (String key : orderedKeys) { if (mappings.containsKey(key)) { @@ -1106,6 +1137,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } catch (Throwable t) { throw new Exception("Cannot get mapping for itemType="+ itemType, t); } + */ return result; } }.catchingExecuteInClassLoader(true); @@ -1169,10 +1201,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, //Index the query = register it in the percolator try { logger.info("Saving query : " + queryName); + /* TODO implement ES7 client.prepareIndex(indexName, ".percolator", queryName) .setSource(query, XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .execute().actionGet(); + */ return true; } catch (Exception e) { throw new Exception("Cannot save query", e); @@ -1201,9 +1235,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Boolean execute(Object... args) throws Exception { //Index the query = register it in the percolator try { + /* TODO implement ES7 client.prepareDelete(indexName, ".percolator", queryName) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .execute().actionGet(); + */ return true; } catch (Exception e) { throw new Exception("Cannot delete query", e); @@ -1320,6 +1356,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override protected Long execute(Object... args) { + /* TODO implement ES7 SearchResponse response = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) .setSize(0) @@ -1327,6 +1364,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .execute() .actionGet(); return response.getHits().getTotalHits(); + */ + return 0L;//TODO remove ES7 } }.catchingExecuteInClassLoader(true); } @@ -1345,6 +1384,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SearchRequestBuilder requestBuilder = null; if (scrollTimeValidity != null) { keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueHours(1), "scrollTimeValidity"); + /* TODO implement ES7 requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) .setFetchSource(true) @@ -1352,12 +1392,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .setFrom(offset) .setQuery(query) .setSize(size); + */ } else { + /* TODO implement ES7 requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) .setFetchSource(true) .setQuery(query) .setFrom(offset); + + */ } if (size == Integer.MIN_VALUE) { @@ -1366,6 +1410,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, requestBuilder.setSize(size); } else { // size == -1, use scroll query to retrieve all the results + /* TODO implement ES7 requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) .setFetchSource(true) @@ -1373,6 +1418,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .setFrom(offset) .setQuery(query) .setSize(100); + */ } if (routing != null) { requestBuilder.setRouting(routing); @@ -1420,18 +1466,21 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, results.add(value); } + /* TODO implement ES7 response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); - + */ // If we have no more hits, exit if (response.getHits().getHits().length == 0) { break; } } + /* TODO implement ES7 client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet(); + */ } else { SearchHits searchHits = response.getHits(); scrollIdentifier = response.getScrollId(); - totalHits = searchHits.getTotalHits(); + totalHits = searchHits.getTotalHits().value; for (SearchHit searchHit : searchHits) { String sourceAsString = searchHit.getSourceAsString(); final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); @@ -1464,6 +1513,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, long totalHits = 0; try { TimeValue keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueMinutes(10), "scrollTimeValidity"); + /* TODO implement ES7 SearchResponse response = client.prepareSearchScroll(scrollIdentifier).setScroll(keepAlive).execute().actionGet(); if (response.getHits().getHits().length == 0) { @@ -1479,6 +1529,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } PartialList<T> result = new PartialList<T>(results, 0, response.getHits().getHits().length, response.getHits().getTotalHits()); + */ + PartialList<T> result = null;//TODO remove ES7 if (scrollIdentifier != null) { result.setScrollIdentifier(scrollIdentifier); result.setScrollTimeValidity(scrollTimeValidity); @@ -1513,11 +1565,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Map<String, Long> execute(Object... args) { Map<String, Long> results = new LinkedHashMap<String, Long>(); + /* TODO implement ES7 SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) .setSize(0) .setQuery(QueryBuilders.matchAllQuery()); - List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>(); if (aggregate != null) { @@ -1646,7 +1698,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } - + */ return results; } }.catchingExecuteInClassLoader(true); @@ -1669,7 +1721,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (bulkProcessor != null) { bulkProcessor.flush(); } - client.indices().refresh(Requests.refreshRequest(), RequestOptions.DEFAULT); + try { + client.indices().refresh(Requests.refreshRequest(), RequestOptions.DEFAULT); + } catch (IOException e) { + e.printStackTrace();//TODO manage ES7 + } return true; } }.catchingExecuteInClassLoader(true); @@ -1682,6 +1738,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate") { @Override protected Object execute(Object... args) throws Exception { + /* TODO implement ES7 IndicesStatsResponse statsResponse = client.indices().prepareStats(indexName + "-*") .setIndexing(false) .setGet(false) @@ -1714,6 +1771,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (!toDelete.isEmpty()) { client.admin().indices().prepareDelete(toDelete.toArray(new String[toDelete.size()])).execute().actionGet(); } + */ return null; } }.catchingExecuteInClassLoader(true); @@ -1725,7 +1783,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override protected Void execute(Object... args) { QueryBuilder query = termQuery("scope", scope); - + /* TODO implement ES7 BulkRequestBuilder deleteByScope = client.prepareBulk(); final TimeValue keepAlive = TimeValue.timeValueHours(1); @@ -1758,7 +1816,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.debug("Couldn't delete from scope " + scope + ":\n{}", deleteResponse.buildFailureMessage()); } } - + */ return null; } }.catchingExecuteInClassLoader(true); @@ -1772,6 +1830,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Map<String, Double> execute(Object... args) { Map<String, Double> results = new LinkedHashMap<String, Double>(); + /* TODO implement ES7 SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) .setSize(0) @@ -1816,6 +1875,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } + */ return results; } }.catchingExecuteInClassLoader(true); diff --git a/persistence-elasticsearch/core/src/test/java/org/apache/unomi/persistence/elasticsearch/ElasticsearchPersistenceTest.java b/persistence-elasticsearch/core/src/test/java/org/apache/unomi/persistence/elasticsearch/ElasticsearchPersistenceTest.java new file mode 100644 index 0000000..32e1653 --- /dev/null +++ b/persistence-elasticsearch/core/src/test/java/org/apache/unomi/persistence/elasticsearch/ElasticsearchPersistenceTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.persistence.elasticsearch; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; +import java.util.logging.Logger; +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.core.MainResponse; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.Environment; +import org.elasticsearch.node.MockNode; +import org.elasticsearch.node.Node; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.transport.Netty4Plugin; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class) +@ThreadLeakScope(value = ThreadLeakScope.Scope.NONE) +public class ElasticsearchPersistenceTest { + + private static final Logger logger = Logger.getLogger(ElasticsearchPersistenceTest.class.getName()); + + private static final String CLUSTER_NAME = "unomi-cluster-test"; + private static final String NODE_NAME = "unomi-node-test"; + private static final String HOST = "127.0.0.1"; + private static final int HTTP_PORT_NODE_1 = 9200; + private static final int HTTP_PORT_NODE_2 = 9201; + private static final int TRANSPORT_PORT_NODE_1 = 9300; + private static final int TRANSPORT_PORT_NODE_2 = 9301; + + private static RestHighLevelClient restHighLevelClient; + + private static Node node1; + private static Node node2; + + @BeforeClass + public static void setup() throws Exception { + Collection plugins = Arrays.asList(Netty4Plugin.class); + + Settings settingsNode1 = Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), CLUSTER_NAME) + .put(Node.NODE_NAME_SETTING.getKey(), NODE_NAME + "-1") + .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) + .put(Environment.PATH_HOME_SETTING.getKey(), "target/data-1") + .put(Environment.PATH_DATA_SETTING.getKey(), "target/data-1") + .put("network.host", HOST) + .put("http.port", HTTP_PORT_NODE_1) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME) + .put("transport.port", TRANSPORT_PORT_NODE_1) + .build(); + node1 = new MockNode(settingsNode1, plugins); + node1.start(); + + Settings settingsNode2 = Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), CLUSTER_NAME) + .put(Node.NODE_NAME_SETTING.getKey(), NODE_NAME + "-2") + .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) + .put(Environment.PATH_HOME_SETTING.getKey(), "target/data-2") + .put(Environment.PATH_DATA_SETTING.getKey(), "target/data-2") + .put("network.host", HOST) + .put("http.port", HTTP_PORT_NODE_2) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME) + .put("transport.port", TRANSPORT_PORT_NODE_2) + .build(); + node2 = new MockNode(settingsNode2, plugins); + node2.start(); + + restHighLevelClient = new RestHighLevelClient(RestClient.builder( + new HttpHost(HOST, HTTP_PORT_NODE_1, "http"), + new HttpHost(HOST, HTTP_PORT_NODE_2, "http"))); + } + + @AfterClass + public static void teardown() throws Exception { + IOUtils.close(restHighLevelClient); + if (node1 != null) { + node1.close(); + } + if (node2 != null) { + node2.close(); + } + } + + @Test + public void testGetClusterInfo() throws Exception { + MainResponse response = restHighLevelClient.info(RequestOptions.DEFAULT); + logger.info("Cluster getMinimumIndexCompatibilityVersion: " + response.getVersion().getMinimumIndexCompatibilityVersion()); + logger.info("Cluster getMinimumWireCompatibilityVersion: " + response.getVersion().getMinimumWireCompatibilityVersion()); + logger.info("Cluster number: " + response.getVersion().getNumber()); + } + + @Test + public void testCreateIndex() throws Exception { + restHighLevelClient.info(RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build()); + CreateIndexRequest request = new CreateIndexRequest("unomi-index-1"); + CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT); + if (response.isAcknowledged()) { + logger.info(">>> Create index :: ok :: name = " + response.index()); + } else { + logger.info(">>> Create index :: not acknowledged"); + } + +// ClusterHealthResponse actionGet = restHighLevelClient.cluster() +// .health(Requests.clusterHealthRequest("unomi-index-1").waitForGreenStatus().waitForEvents(Priority.LANGUID) +// .waitForNoRelocatingShards(true), RequestOptions.DEFAULT); +// Assert.assertNotNull(actionGet); +// +// switch (actionGet.getStatus()) { +// case GREEN: +// logger.info(">>> Cluster State :: GREEN"); +// break; +// case YELLOW: +// logger.info(">>> Cluster State :: YELLOW"); +// break; +// case RED: +// logger.info(">>> Cluster State :: RED"); +// break; +// } +// Assert.assertNotEquals(actionGet.getStatus(), ClusterHealthStatus.RED); + + IndexRequest indexRequest = new IndexRequest("unomi-index-1"); + indexRequest.id(UUID.randomUUID().toString()); + String type = "{\"type\":\"unomi-type\"}"; + String source = "{\"name\":\"unomi-name\"}"; + indexRequest.source(XContentType.JSON, type, source); + ActionRequestValidationException exception = indexRequest.validate(); + Assert.assertNull(exception); + + IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); + Assert.assertNotNull(indexResponse); + if (indexResponse.status() == RestStatus.CREATED) { + logger.info(">>> Insert data created"); + } else { + logger.info(">>> Insert data ko :: " + indexResponse.status().name()); + } + Assert.assertEquals(indexResponse.status(), RestStatus.CREATED); + } + +} diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java index 239c321..27e5ced 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java @@ -17,6 +17,15 @@ package org.apache.unomi.plugins.baseplugin.conditions; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; import ognl.Node; import ognl.Ognl; import ognl.OgnlContext; @@ -24,23 +33,19 @@ import ognl.OgnlException; import ognl.enhance.ExpressionAccessor; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.unomi.api.*; +import org.apache.unomi.api.CustomItem; +import org.apache.unomi.api.Event; +import org.apache.unomi.api.Item; +import org.apache.unomi.api.Profile; +import org.apache.unomi.api.Session; import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper; import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluator; import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluatorDispatcher; import org.apache.unomi.persistence.spi.PropertyHelper; -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.common.joda.DateMathParser; -import org.elasticsearch.index.mapper.DateFieldMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.function.LongSupplier; -import java.util.regex.Pattern; - /** * Evaluator for property comparison conditions */ @@ -385,6 +390,7 @@ public class PropertyConditionEvaluator implements ConditionEvaluator { if (value == null) { return null; } + /* TODO implement ES7 if (value instanceof Date) { return ((Date) value); } else { @@ -399,6 +405,7 @@ public class PropertyConditionEvaluator implements ConditionEvaluator { logger.warn("unable to parse date " + value.toString(), e); } } + */ return null; } diff --git a/pom.xml b/pom.xml index a9968ab..45b0119 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ <version.karaf>4.2.7</version.karaf> <version.karaf.cellar>4.1.3</version.karaf.cellar> <version.pax.exam>4.13.1</version.pax.exam> - <elasticsearch.version>5.6.3</elasticsearch.version> + <elasticsearch.version>7.4.0</elasticsearch.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target>
