UNOMI 70 - Upgrade to ElasticSearch 5.0 - Upgrade to ElasticSearch 5.0.1 - Upgrade CXF to 3.0.2 - Upgrade to Karaf 3.0.8 - Replace Unomi clustering from ES clustering to Karaf Cellar clustering
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/96a018ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/96a018ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/96a018ec Branch: refs/heads/feature-UNOMI-70-ES5X Commit: 96a018ecb1b5b907930504e69f6da9357db8385e Parents: 90eb893 Author: Serge Huber <[email protected]> Authored: Mon Dec 5 10:43:29 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Mon Dec 5 10:43:29 2016 +0100 ---------------------------------------------------------------------- extensions/geonames/rest/pom.xml | 5 - .../META-INF/cxs/mappings/geonameEntry.json | 42 +- extensions/lists-extension/rest/pom.xml | 5 - .../META-INF/cxs/mappings/userList.json | 23 +- extensions/privacy-extension/rest/pom.xml | 5 - itests/pom.xml | 41 +- kar/src/main/feature/feature.xml | 2 +- package/pom.xml | 36 +- persistence-elasticsearch/core/pom.xml | 115 +++-- .../ElasticSearchPersistenceServiceImpl.java | 490 ++++++++++--------- .../META-INF/cxs/mappings/actionType.json | 4 +- .../META-INF/cxs/mappings/campaign.json | 32 +- .../META-INF/cxs/mappings/campaignevent.json | 30 +- .../META-INF/cxs/mappings/conditionType.json | 66 ++- .../resources/META-INF/cxs/mappings/event.json | 319 +++++++++++- .../resources/META-INF/cxs/mappings/goal.json | 28 +- .../META-INF/cxs/mappings/persona.json | 22 +- .../META-INF/cxs/mappings/personaSession.json | 81 ++- .../META-INF/cxs/mappings/profile.json | 42 +- .../META-INF/cxs/mappings/propertyType.json | 45 +- .../resources/META-INF/cxs/mappings/rule.json | 21 +- .../META-INF/cxs/mappings/scoring.json | 25 +- .../META-INF/cxs/mappings/segment.json | 21 +- .../META-INF/cxs/mappings/session.json | 35 +- .../resources/OSGI-INF/blueprint/blueprint.xml | 26 +- .../core/src/main/resources/log4j2.xml | 29 ++ ...g.apache.unomi.persistence.elasticsearch.cfg | 7 +- .../plugin/security/SecurityPlugin.java | 34 +- .../plugin/security/SecurityPluginService.java | 7 +- .../persistence/spi/PersistenceService.java | 8 - .../BooleanConditionESQueryBuilder.java | 18 +- ...onByPointSessionConditionESQueryBuilder.java | 6 +- .../conditions/PropertyConditionEvaluator.java | 4 +- pom.xml | 31 +- rest/pom.xml | 5 - services/pom.xml | 10 - 36 files changed, 1132 insertions(+), 588 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/extensions/geonames/rest/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/geonames/rest/pom.xml b/extensions/geonames/rest/pom.xml index 72a8e74..ff52075 100644 --- a/extensions/geonames/rest/pom.xml +++ b/extensions/geonames/rest/pom.xml @@ -52,11 +52,6 @@ <dependency> <groupId>org.apache.cxf</groupId> - <artifactId>cxf-rt-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-frontend-jaxws</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/extensions/geonames/services/src/main/resources/META-INF/cxs/mappings/geonameEntry.json ---------------------------------------------------------------------- diff --git a/extensions/geonames/services/src/main/resources/META-INF/cxs/mappings/geonameEntry.json b/extensions/geonames/services/src/main/resources/META-INF/cxs/mappings/geonameEntry.json index 63664fe..612f3ca 100644 --- a/extensions/geonames/services/src/main/resources/META-INF/cxs/mappings/geonameEntry.json +++ b/extensions/geonames/services/src/main/resources/META-INF/cxs/mappings/geonameEntry.json @@ -2,56 +2,46 @@ "geonameEntry": { "properties": { "admin1Code": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "admin2Code": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "admin3Code": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "admin4Code": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "alternatenames": { - "type": "string" + "type": "keyword" }, "asciiname": { - "type": "string" + "type": "keyword" }, "cc2": { - "type": "string" + "type": "keyword" }, "countryCode": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "dem": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "elevation": { "type": "long" }, "featureClass": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "featureCode": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "itemId": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "itemType": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "location": { "type": "geo_point" @@ -61,15 +51,13 @@ "format": "strict_date_optional_time||epoch_millis" }, "name": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "population": { "type": "long" }, "timezone": { - "type": "string", - "analyzer": "folding" + "type": "keyword" } } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/extensions/lists-extension/rest/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/lists-extension/rest/pom.xml b/extensions/lists-extension/rest/pom.xml index 0988a06..f2d31ad 100644 --- a/extensions/lists-extension/rest/pom.xml +++ b/extensions/lists-extension/rest/pom.xml @@ -53,11 +53,6 @@ <dependency> <groupId>org.apache.cxf</groupId> - <artifactId>cxf-rt-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-frontend-jaxws</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/extensions/lists-extension/services/src/main/resources/META-INF/cxs/mappings/userList.json ---------------------------------------------------------------------- diff --git a/extensions/lists-extension/services/src/main/resources/META-INF/cxs/mappings/userList.json b/extensions/lists-extension/services/src/main/resources/META-INF/cxs/mappings/userList.json index 3bb8ae0..244d71a 100644 --- a/extensions/lists-extension/services/src/main/resources/META-INF/cxs/mappings/userList.json +++ b/extensions/lists-extension/services/src/main/resources/META-INF/cxs/mappings/userList.json @@ -4,27 +4,25 @@ { "all": { "match": "*", - "match_mapping_type": "string", + "match_mapping_type": "text", "mapping": { - "type": "string", - "analyzer": "folding" + "type": "text", + "analyzer" : "folding" } } } ], "properties": { "itemId": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "itemType": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "metadata": { "properties": { "description": { - "type": "string", + "type": "text", "analyzer": "folding" }, "enabled": { @@ -34,22 +32,19 @@ "type": "boolean" }, "id": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "missingPlugins": { "type": "boolean" }, "name": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "readOnly": { "type": "boolean" }, "scope": { - "type": "string", - "analyzer": "folding" + "type": "keyword" } } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/extensions/privacy-extension/rest/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/privacy-extension/rest/pom.xml b/extensions/privacy-extension/rest/pom.xml index 2bc7560..7c51644 100644 --- a/extensions/privacy-extension/rest/pom.xml +++ b/extensions/privacy-extension/rest/pom.xml @@ -52,11 +52,6 @@ <dependency> <groupId>org.apache.cxf</groupId> - <artifactId>cxf-rt-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-frontend-jaxws</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/itests/pom.xml ---------------------------------------------------------------------- diff --git a/itests/pom.xml b/itests/pom.xml index 285a2f6..11e0a94 100644 --- a/itests/pom.xml +++ b/itests/pom.xml @@ -134,13 +134,52 @@ </executions> </plugin> <plugin> + <groupId>com.github.alexcojocaru</groupId> + <artifactId>elasticsearch-maven-plugin</artifactId> + <!-- REPLACE THE FOLLOWING WITH THE PLUGIN VERSION YOU NEED --> + <version>5.0</version> + <configuration> + <clusterName>contextElasticSearch</clusterName> + <tcpPort>9300</tcpPort> + <httpPort>9200</httpPort> + </configuration> + <executions> + <!-- + The elasticsearch maven plugin goals are by default bound to the + pre-integration-test and post-integration-test phases + --> + <execution> + <id>start-elasticsearch</id> + <phase>pre-integration-test</phase> + <goals> + <goal>runforked</goal> + </goals> + </execution> + <execution> + <id>stop-elasticsearch</id> + <phase>post-integration-test</phase> + <goals> + <goal>stop</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> + <artifactId>maven-failsafe-plugin</artifactId> <configuration> <includes> <include>**/*AllTests.java</include> </includes> </configuration> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/kar/src/main/feature/feature.xml ---------------------------------------------------------------------- diff --git a/kar/src/main/feature/feature.xml b/kar/src/main/feature/feature.xml index 5517a19..39413fb 100644 --- a/kar/src/main/feature/feature.xml +++ b/kar/src/main/feature/feature.xml @@ -20,7 +20,7 @@ <feature description="unomi-kar" version="${project.version}" name="unomi-kar" start-level="75"> <feature>war</feature> - <feature version="[2.7.11,3)">cxf</feature> + <feature>cxf</feature> <feature>openwebbeans</feature> <feature>pax-cdi-web-openwebbeans</feature> <configfile finalname="/etc/org.apache.unomi.web.cfg">mvn:org.apache.unomi/unomi-wab/${project.version}/cfg/unomicfg</configfile> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/package/pom.xml ---------------------------------------------------------------------- diff --git a/package/pom.xml b/package/pom.xml index 4a39217..6f622d9 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -64,14 +64,23 @@ <scope>runtime</scope> </dependency> + <!-- <dependency> - <!-- scope is runtime so the feature repo is listed in the features service config file, and features may be installed using the karaf-maven-plugin configuration --> <groupId>org.apache.cxf.karaf</groupId> <artifactId>apache-cxf</artifactId> <classifier>features</classifier> <type>xml</type> <scope>runtime</scope> </dependency> + --> + <dependency> + <groupId>org.apache.karaf.cellar</groupId> + <artifactId>apache-karaf-cellar</artifactId> + <classifier>features</classifier> + <type>xml</type> + <scope>runtime</scope> + </dependency> + <dependency> <groupId>org.apache.unomi</groupId> <artifactId>unomi-kar</artifactId> @@ -167,30 +176,8 @@ <artifactItems> <artifactItem> <groupId>org.elasticsearch.plugin</groupId> - <artifactId>delete-by-query</artifactId> - <version>${elasticsearch.version}</version> - <type>zip</type> - <outputDirectory>${project.build.directory}/assembly/elasticsearch/plugins/delete-by-query</outputDirectory> - </artifactItem> - <artifactItem> - <groupId>org.elasticsearch.module</groupId> - <artifactId>lang-groovy</artifactId> - <version>${elasticsearch.version}</version> - <type>zip</type> - <outputDirectory>${project.build.directory}/assembly/elasticsearch/modules/lang-groovy</outputDirectory> - </artifactItem> - <artifactItem> - <groupId>org.elasticsearch.module</groupId> - <artifactId>lang-expression</artifactId> - <version>${elasticsearch.version}</version> - <type>zip</type> - <outputDirectory>${project.build.directory}/assembly/elasticsearch/modules/lang-expression</outputDirectory> - </artifactItem> - <artifactItem> - <groupId>org.elasticsearch.module</groupId> - <artifactId>reindex</artifactId> + <artifactId>reindex-client</artifactId> <version>${elasticsearch.version}</version> - <type>zip</type> <outputDirectory>${project.build.directory}/assembly/elasticsearch/modules/reindex</outputDirectory> </artifactItem> </artifactItems> @@ -299,6 +286,7 @@ <feature>cxf</feature> <feature>openwebbeans</feature> <feature>pax-cdi-web-openwebbeans</feature> + <feature>cellar</feature> <feature>unomi-kar</feature> </bootFeatures> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/persistence-elasticsearch/core/pom.xml ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index 6380219..6a05775 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -35,6 +35,11 @@ <artifactId>org.osgi.core</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.compendium</artifactId> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.unomi</groupId> @@ -49,14 +54,29 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.karaf.cellar</groupId> + <artifactId>org.apache.karaf.cellar.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.karaf.cellar</groupId> + <artifactId>org.apache.karaf.cellar.config</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>20.0</version> + </dependency> + <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> <scope>compile</scope> </dependency> <dependency> - <groupId>org.elasticsearch.plugin</groupId> - <artifactId>delete-by-query</artifactId> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> <version>${elasticsearch.version}</version> <scope>compile</scope> </dependency> @@ -64,41 +84,43 @@ <!-- The following are optional dependencies from the ElasticSearch that are made mandatory --> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - <version>1.7.7</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>net.java.dev.jna</groupId> - <artifactId>jna</artifactId> - <version>4.1.0</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>com.github.spullara.mustache.java</groupId> - <artifactId>compiler</artifactId> - <version>0.8.13</version> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.6.2</version> <scope>compile</scope> + <optional>true</optional> </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>2.8.1</version> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.6.2</version> <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </exclusion> + </exclusions> + <optional>true</optional> </dependency> <dependency> - <groupId>org.noggit</groupId> - <artifactId>noggit</artifactId> - <version>0.7</version> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + <version>2.6.2</version> <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </exclusion> + </exclusions> + <optional>true</optional> </dependency> + <!-- End of optional ElasticSearch dependencies --> <dependency> @@ -169,6 +191,39 @@ sun.misc;resolution:=optional, sun.security.util;resolution:=optional, sun.security.x509;resolution:=optional, + com.google.protobuf.nano;resolution:=optional, + com.jcraft.jzlib;resolution:=optional, + com.ning.compress;resolution:=optional, + com.ning.compress.lzf;resolution:=optional, + com.ning.compress.lzf.util;resolution:=optional, + javassist;resolution:=optional, + lzma.sdk;resolution:=optional, + lzma.sdk.lzma;resolution:=optional, + net.jpountz.lz4;resolution:=optional, + net.jpountz.xxhash;resolution:=optional, + org.apache.tomcat;resolution:=optional, + org.eclipse.jetty.alpn;resolution:=optional, + org.joda.convert;resolution:=optional, + org.locationtech.spatial4j.context;resolution:=optional, + org.locationtech.spatial4j.context.jts;resolution:=optional, + org.locationtech.spatial4j.distance;resolution:=optional, + org.locationtech.spatial4j.exception;resolution:=optional, + org.locationtech.spatial4j.io;resolution:=optional, + org.locationtech.spatial4j.shape;resolution:=optional, + org.locationtech.spatial4j.shape.impl;resolution:=optional, + org.locationtech.spatial4j.shape.jts;resolution:=optional, + org.zeromq;resolution:=optional, + org.apache.commons.compress.compressors;resolution:=optional, + org.apache.commons.compress.utils;resolution:=optional, + org.apache.commons.csv;resolution:=optional, + org.apache.kafka.clients.producer;resolution:=optional, + javax.persistence;resolution:=optional, + com.google.errorprone.annotations.concurrent;resolution:=optional, + com.lmax.disruptor;resolution:=optional, + com.lmax.disruptor.dsl;resolution:=optional, + com.fasterxml.jackson.dataformat.xml;resolution:=optional, + com.fasterxml.jackson.dataformat.xml.annotation;resolution:=optional, + com.fasterxml.jackson.dataformat.xml.util;resolution:=optional, * </Import-Package> <Export-Package> @@ -176,7 +231,7 @@ org.elasticsearch.index.query.*;version="${elasticsearch.version}", org.apache.unomi.persistence.elasticsearch.conditions;version="${project.version}" </Export-Package> - <Embed-Dependency>*;scope=compile|runtime;artifactId=!log4j</Embed-Dependency> + <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency> <Embed-Transitive>true</Embed-Transitive> </instructions> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 794b03b..0dc3899 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -17,7 +17,13 @@ package org.apache.unomi.persistence.elasticsearch; -import com.google.common.collect.UnmodifiableIterator; +import org.apache.commons.lang3.StringUtils; +import org.apache.karaf.cellar.config.ClusterConfigurationEvent; +import org.apache.karaf.cellar.config.Constants; +import org.apache.karaf.cellar.core.*; +import org.apache.karaf.cellar.core.control.SwitchStatus; +import org.apache.karaf.cellar.core.event.EventProducer; +import org.apache.karaf.cellar.core.event.EventType; import org.apache.unomi.api.ClusterNode; import org.apache.unomi.api.Item; import org.apache.unomi.api.PartialList; @@ -31,33 +37,22 @@ import org.apache.unomi.persistence.elasticsearch.conditions.*; import org.apache.unomi.persistence.spi.CustomObjectMapper; import org.apache.unomi.persistence.spi.PersistenceService; import org.apache.unomi.persistence.spi.aggregate.*; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.bulk.*; -import org.elasticsearch.action.deletebyquery.DeleteByQueryAction; -import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; -import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -67,7 +62,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; -import org.elasticsearch.node.Node; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; @@ -77,91 +71,86 @@ import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.global.Global; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.search.aggregations.bucket.missing.MissingBuilder; -import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder; -import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeBuilder; -import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder; +import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.osgi.framework.BundleContext; import org.osgi.framework.BundleEvent; import org.osgi.framework.ServiceReference; import org.osgi.framework.SynchronousBundleListener; +import org.osgi.service.cm.ConfigurationAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.*; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; import java.io.BufferedReader; -import java.io.File; import java.io.IOException; import java.io.InputStreamReader; -import java.net.*; -import java.nio.file.Paths; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.lang.management.RuntimeMXBean; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.node.NodeBuilder.nodeBuilder; - @SuppressWarnings("rawtypes") public class ElasticSearchPersistenceServiceImpl implements PersistenceService, ClusterService, SynchronousBundleListener { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); - public static final String DISCOVERY_ZEN_PING_MULTICAST_ENABLED = "discovery.zen.ping.multicast.enabled"; public static final String CONTEXTSERVER_ADDRESS = "contextserver.address"; public static final String CONTEXTSERVER_PORT = "contextserver.port"; public static final String CONTEXTSERVER_SECURE_ADDRESS = "contextserver.secureAddress"; public static final String CONTEXTSERVER_SECURE_PORT = "contextserver.securePort"; - public static final String KARAF_HOME = "karaf.home"; - public static final String ELASTICSEARCH_HOME_DIRECTORY = "elasticsearch"; - public static final String ELASTICSEARCH_PLUGINS_DIRECTORY = ELASTICSEARCH_HOME_DIRECTORY + "/plugins"; - public static final String ELASTICSEARCH_DATA_DIRECTORY = ELASTICSEARCH_HOME_DIRECTORY + "/data"; - public static final String INDEX_NUMBER_OF_REPLICAS = "index.number_of_replicas"; - public static final String INDEX_NUMBER_OF_SHARDS = "index.number_of_shards"; - public static final String NODE_CONTEXTSERVER_ADDRESS = "node.contextserver.address"; - public static final String NODE_CONTEXTSERVER_PORT = "node.contextserver.port"; - public static final String NODE_CONTEXTSERVER_SECURE_ADDRESS = "node.contextserver.secureAddress"; - public static final String NODE_CONTEXTSERVER_SECURE_PORT = "node.contextserver.securePort"; public static final String NUMBER_OF_SHARDS = "number_of_shards"; public static final String NUMBER_OF_REPLICAS = "number_of_replicas"; public static final String CLUSTER_NAME = "cluster.name"; - public static final String NODE_DATA = "node.data"; - public static final String PATH_DATA = "path.data"; - public static final String PATH_HOME = "path.home"; - public static final String PATH_PLUGINS = "path.plugins"; - public static final String INDEX_MAX_RESULT_WINDOW = "index.max_result_window"; - public static final String MAPPER_ALLOW_DOTS_IN_NAME = "mapper.allow_dots_in_name"; public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name"; public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests"; public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions"; public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize"; public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval"; public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy"; - public static final String ELASTICSEARCH_NETWORK_HOST = "network.host"; + public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION = "org.apache.unoni.nodes"; + public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS = "publicEndpoints"; + public static final String KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS = "secureEndpoints"; - private Node node; - private Client nodeClient; private Client client; private BulkProcessor bulkProcessor; private String clusterName; private String indexName; private String monthlyIndexNumberOfShards; private String monthlyIndexNumberOfReplicas; - private String numberOfShards; - private String numberOfReplicas; - private Boolean nodeData; - private Boolean discoveryEnabled; private String elasticSearchConfig = null; private BundleContext bundleContext; private Map<String, String> mappings = new HashMap<String, String>(); private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher; private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher; + private ClusterManager karafCellarClusterManager; + private EventProducer karafCellarEventProducer; + private GroupManager karafCellarGroupManager; + private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME; + private ConfigurationAdmin osgiConfigurationAdmin; + private String karafJMXUsername = "karaf"; + private String karafJMXPassword = "karaf"; + private int karafJMXPort = 1099; private Map<String,String> indexNames; private List<String> itemsMonthlyIndexed; @@ -203,22 +192,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas; } - public void setDiscoveryEnabled(Boolean discoveryEnabled) { - this.discoveryEnabled = discoveryEnabled; - } - - public void setNumberOfShards(String numberOfShards) { - this.numberOfShards = numberOfShards; - } - - public void setNumberOfReplicas(String numberOfReplicas) { - this.numberOfReplicas = numberOfReplicas; - } - - public void setNodeData(Boolean nodeData) { - this.nodeData = nodeData; - } - public void setAddress(String address) { this.address = address; } @@ -287,6 +260,38 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy; } + public void setKarafCellarClusterManager(ClusterManager karafCellarClusterManager) { + this.karafCellarClusterManager = karafCellarClusterManager; + } + + public void setKarafCellarEventProducer(EventProducer karafCellarEventProducer) { + this.karafCellarEventProducer = karafCellarEventProducer; + } + + public void setKarafCellarGroupManager(GroupManager karafCellarGroupManager) { + this.karafCellarGroupManager = karafCellarGroupManager; + } + + public void setKarafCellarGroupName(String karafCellarGroupName) { + this.karafCellarGroupName = karafCellarGroupName; + } + + public void setOsgiConfigurationAdmin(ConfigurationAdmin osgiConfigurationAdmin) { + this.osgiConfigurationAdmin = osgiConfigurationAdmin; + } + + public void setKarafJMXUsername(String karafJMXUsername) { + this.karafJMXUsername = karafJMXUsername; + } + + public void setKarafJMXPassword(String karafJMXPassword) { + this.karafJMXPassword = karafJMXPassword; + } + + public void setKarafJMXPort(int karafJMXPort) { + this.karafJMXPort = karafJMXPort; + } + public void start() { loadPredefinedMappings(bundleContext, false); @@ -294,29 +299,58 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // on startup new InClassLoaderExecute<Object>() { public Object execute(Object... args) { - logger.info("Starting ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "..."); - Map<String, String> settings = null; - if (elasticSearchConfig != null && elasticSearchConfig.length() > 0) { - try { - URI elasticSearchConfigURI = new URI(elasticSearchConfig); - Settings.Builder settingsBuilder = Settings.builder().loadFromPath(Paths.get(elasticSearchConfigURI)); - settings = settingsBuilder.build().getAsMap(); - logger.info("Successfully loaded ElasticSearch configuration from " + elasticSearchConfigURI); - } catch (URISyntaxException e) { - logger.error("Error in ElasticSearch configuration URI ", e); - } catch (SettingsException se) { - logger.info("Error trying to load settings from " + elasticSearchConfig + ": " + se.getMessage() + " (activate debug mode for exception details)"); - if (logger.isDebugEnabled()) { - logger.debug("Exception details", se); - } - } - } + logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "..."); address = System.getProperty(CONTEXTSERVER_ADDRESS, address); port = System.getProperty(CONTEXTSERVER_PORT, port); secureAddress = System.getProperty(CONTEXTSERVER_SECURE_ADDRESS, secureAddress); securePort = System.getProperty(CONTEXTSERVER_SECURE_PORT, securePort); + if (karafCellarEventProducer != null && karafCellarClusterManager != null) { + + boolean setupConfigOk = true; + Group group = karafCellarGroupManager.findGroupByName(karafCellarGroupName); + if (setupConfigOk && group == null) { + logger.error("Cluster group " + karafCellarGroupName + " doesn't exist"); + setupConfigOk = false; + } + + // check if the producer is ON + if (setupConfigOk && karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) { + logger.error("Cluster event producer is OFF"); + setupConfigOk = false; + } + + // check if the config pid is allowed + if (setupConfigOk && !isClusterConfigPIDAllowed(group, Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, EventType.OUTBOUND)) { + logger.error("Configuration PID " + KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster group " + karafCellarGroupName); + setupConfigOk = false; + } + + if (setupConfigOk) { + Map<String, Properties> configurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName); + org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode(); + Properties karafCellarClusterNodeConfiguration = configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); + if (karafCellarClusterNodeConfiguration == null) { + karafCellarClusterNodeConfiguration = new Properties(); + } + String publicEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + address + ":" + port); + String secureEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, thisKarafNode.getId() + "=" + secureAddress + ":" + securePort); + String[] publicEndpointsArray = publicEndpointsPropValue.split(","); + Set<String> publicEndpoints = new TreeSet<String>(Arrays.asList(publicEndpointsArray)); + String[] secureEndpointsArray = secureEndpointsPropValue.split(","); + Set<String> secureEndpoints = new TreeSet<String>(Arrays.asList(secureEndpointsArray)); + publicEndpoints.add(thisKarafNode.getId() + "=" + address + ":" + port); + secureEndpoints.add(thisKarafNode.getId() + "=" + secureAddress + ":" + port); + karafCellarClusterNodeConfiguration.setProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, StringUtils.join(publicEndpoints, ",")); + karafCellarClusterNodeConfiguration.setProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, StringUtils.join(secureEndpoints, ",")); + configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, karafCellarClusterNodeConfiguration); + ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); + clusterConfigurationEvent.setSourceGroup(group); + karafCellarEventProducer.produce(clusterConfigurationEvent); + } + } + bulkProcessorName = System.getProperty(BULK_PROCESSOR_NAME, bulkProcessorName); bulkProcessorConcurrentRequests = System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, bulkProcessorConcurrentRequests); bulkProcessorBulkActions = System.getProperty(BULK_PROCESSOR_BULK_ACTIONS, bulkProcessorBulkActions); @@ -324,57 +358,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, bulkProcessorFlushInterval = System.getProperty(BULK_PROCESSOR_FLUSH_INTERVAL, bulkProcessorFlushInterval); bulkProcessorBackoffPolicy = System.getProperty(BULK_PROCESSOR_BACKOFF_POLICY, bulkProcessorBackoffPolicy); - Settings.Builder settingsBuilder = Settings.builder(); - if (settings != null) { - settingsBuilder.put(settings); - } - - String karafHome = System.getProperty(KARAF_HOME); - File pluginsFile = new File(getConfig(settings, PATH_PLUGINS, new File(new File(karafHome), ELASTICSEARCH_PLUGINS_DIRECTORY).getAbsolutePath())); - File homeFile = new File(getConfig(settings, PATH_HOME, new File(new File(karafHome), ELASTICSEARCH_HOME_DIRECTORY).getAbsolutePath())); - File dataFile = new File(getConfig(settings, PATH_DATA, new File(new File(karafHome), ELASTICSEARCH_DATA_DIRECTORY).getAbsolutePath())); - - // allow dots in mappings (re-introduced in ElasticSearch 2.4.0) - System.setProperty(MAPPER_ALLOW_DOTS_IN_NAME, "true"); - - settingsBuilder.put(CLUSTER_NAME, clusterName) - .put(NODE_DATA, nodeData) - .put(PATH_DATA, dataFile.getAbsolutePath()) - .put(PATH_HOME, homeFile.getAbsolutePath()) - .put(PATH_PLUGINS, pluginsFile.getAbsolutePath()) - .put(DISCOVERY_ZEN_PING_MULTICAST_ENABLED, discoveryEnabled) - .put(INDEX_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(INDEX_NUMBER_OF_SHARDS, numberOfShards) - .put(NODE_CONTEXTSERVER_ADDRESS, address) - .put(NODE_CONTEXTSERVER_PORT, port) - .put(NODE_CONTEXTSERVER_SECURE_ADDRESS, secureAddress) - .put(NODE_CONTEXTSERVER_SECURE_PORT, securePort) - .put(INDEX_MAX_RESULT_WINDOW, "2147483647"); - - if (settingsBuilder.get(ELASTICSEARCH_NETWORK_HOST) == null) { - logger.info("Setting ElasticSearch network host address to {}", address); - settingsBuilder.put(ELASTICSEARCH_NETWORK_HOST, address); - } - - node = nodeBuilder().settings(settingsBuilder).node(); - nodeClient = node.client(); - - logger.info("Waiting for ElasticSearch to start..."); - - nodeClient.admin().cluster().prepareHealth() - .setWaitForGreenStatus() - .get(); - - logger.info("Cluster status is GREEN"); - try { - Settings transportSettings = Settings.settingsBuilder() + Settings transportSettings = Settings.builder() .put(CLUSTER_NAME, clusterName).build(); - client = TransportClient.builder().settings(transportSettings).build() + client = new PreBuiltTransportClient(transportSettings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(address), 9300)); } catch (UnknownHostException e) { logger.error("Error resolving address " + address + " ElasticSearch transport client not connected, using internal client instead", e); - client = nodeClient; } // @todo is there a better way to detect index existence than to wait for it to startup ? @@ -412,7 +402,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, client.admin().indices().preparePutTemplate(indexName + "_monthlyindex") .setTemplate(indexName + "-*") .setOrder(1) - .setSettings(Settings.settingsBuilder() + .setSettings(Settings.builder() .put(NUMBER_OF_SHARDS, Integer.parseInt(monthlyIndexNumberOfShards)) .put(NUMBER_OF_REPLICAS, Integer.parseInt(monthlyIndexNumberOfReplicas)) .build()).execute().actionGet(); @@ -582,11 +572,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.error("Error waiting for bulk operations to flush !", e); } } - if (nodeClient != client) { + if (client != null) { client.close(); } - nodeClient.close(); - node.close(); return null; } }.executeInClassLoader(); @@ -711,7 +699,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String itemType = (String) clazz.getField("ITEM_TYPE").get(null); if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) { - PartialList<T> r = query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null); + PartialList<T> r = query(QueryBuilders.idsQuery(itemType).addIds(itemId), null, clazz, 0, 1, null); if (r.size() > 0) { return r.get(0); } @@ -886,11 +874,39 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, try { String itemType = (String) clazz.getField("ITEM_TYPE").get(null); - DeleteByQueryResponse rsp = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + BulkRequestBuilder deleteByScope = client.prepareBulk(); + + final TimeValue keepAlive = TimeValue.timeValueHours(1); + SearchResponse response = client.prepareSearch(indexName + "*") .setIndices(getIndexNameForQuery(itemType)) + .setScroll(keepAlive) .setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query)) - .execute() - .actionGet(); + .setSize(100).execute().actionGet(); + + // Scroll until no more hits are returned + while (true) { + + for (SearchHit hit : response.getHits().getHits()) { + // add hit to bulk delete + deleteByScope.add(Requests.deleteRequest(hit.index()).type(hit.type()).id(hit.id())); + } + + response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); + + // If we have no more hits, exit + if (response.getHits().getHits().length == 0) { + break; + } + } + + // we're done with the scrolling, delete now + if (deleteByScope.numberOfActions() > 0) { + final BulkResponse deleteResponse = deleteByScope.get(); + if (deleteResponse.hasFailures()) { + // do something + logger.debug("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage()); + } + } return true; } catch (Exception e) { @@ -995,7 +1011,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings(); Map<String, Map<String, Object>> propertyMap = new HashMap<>(); try { - UnmodifiableIterator<ImmutableOpenMap<String, MappingMetaData>> it = mappings.valuesIt(); + Iterator<ImmutableOpenMap<String, MappingMetaData>> it = mappings.valuesIt(); while (it.hasNext()) { ImmutableOpenMap<String, MappingMetaData> next = it.next(); Map<String, Map<String, Object>> properties = (Map<String, Map<String, Object>>) next.get(itemType).getSourceAsMap().get("properties"); @@ -1030,7 +1046,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.info("Saving query : " + queryName); client.prepareIndex(indexName, ".percolator", queryName) .setSource(query) - .setRefresh(true) // Needed when the query shall be available immediately + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .execute().actionGet(); return true; } catch (Exception e) { @@ -1057,7 +1073,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, //Index the query = register it in the percolator try { client.prepareDelete(indexName, ".percolator", queryName) - .setRefresh(true) // Needed when the query shall be available immediately + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .execute().actionGet(); return true; } catch (Exception e) { @@ -1069,36 +1085,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } @Override - public List<String> getMatchingSavedQueries(final Item item) { - return new InClassLoaderExecute<List<String>>() { - protected List<String> execute(Object... args) { - List<String> matchingQueries = new ArrayList<String>(); - try { - String source = CustomObjectMapper.getObjectMapper().writeValueAsString(item); - - String itemType = item.getItemType(); - - //Percolate - PercolateResponse response = client.preparePercolate() - .setIndices(indexName) - .setDocumentType(itemType) - .setSource("{doc:" + source + "}").execute().actionGet(); - //Iterate over the results - for (PercolateResponse.Match match : response) { - //Handle the result which is the name of - //the query in the percolator - matchingQueries.add(match.getId().string()); - } - } catch (IOException e) { - logger.error("Error getting matching saved queries for item=" + item, e); - } - return matchingQueries; - } - }.executeInClassLoader(); - - } - - @Override public boolean testMatch(Condition query, Item item) { try { return conditionEvaluatorDispatcher.eval(query, item); @@ -1109,9 +1095,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, final Class<? extends Item> clazz = item.getClass(); String itemType = (String) clazz.getField("ITEM_TYPE").get(null); - QueryBuilder builder = QueryBuilders.andQuery( - QueryBuilders.idsQuery(itemType).ids(item.getItemId()), - conditionESQueryBuilderDispatcher.buildFilter(query)); + QueryBuilder builder = QueryBuilders.boolQuery() + .must(QueryBuilders.idsQuery(itemType).addIds(item.getItemId())) + .must(conditionESQueryBuilderDispatcher.buildFilter(query)); return queryCount(builder, itemType) > 0; } catch (IllegalAccessException e) { logger.error("Error getting query for item=" + item, e); @@ -1181,9 +1167,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected Long execute(Object... args) { SearchResponse response = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) - .setSearchType(SearchType.COUNT) + .setSize(0) .setQuery(QueryBuilders.matchAllQuery()) - .addAggregation(AggregationBuilders.filter("filter").filter(filter)) + .addAggregation(AggregationBuilders.filter("filter", filter)) .execute() .actionGet(); Aggregations searchHits = response.getAggregations(); @@ -1213,7 +1199,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } else if (size != -1) { requestBuilder.setSize(size); } else { - requestBuilder.setSize(Integer.MAX_VALUE); + // requestBuilder.setSize(Integer.MAX_VALUE); } if (routing != null) { requestBuilder.setRouting(routing); @@ -1223,7 +1209,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (String sortByElement : sortByArray) { if (sortByElement.startsWith("geo:")) { String[] elements = sortByElement.split(":"); - GeoDistanceSortBuilder distanceSortBuilder = SortBuilders.geoDistanceSort(elements[1]).point(Double.parseDouble(elements[2]), Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS); + GeoDistanceSortBuilder distanceSortBuilder = SortBuilders.geoDistanceSort(elements[1], Double.parseDouble(elements[2]), Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS); if (elements.length > 4 && elements[4].equals("desc")) { requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.DESC)); } else { @@ -1270,7 +1256,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) - .setSearchType(SearchType.COUNT) + .setSize(0) .setQuery(QueryBuilders.matchAllQuery()); List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>(); @@ -1279,13 +1265,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, AggregationBuilder bucketsAggregation = null; if (aggregate instanceof DateAggregate) { DateAggregate dateAggregate = (DateAggregate) aggregate; - DateHistogramBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(aggregate.getField()).interval(new DateHistogramInterval((dateAggregate.getInterval()))); + DateHistogramAggregationBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(aggregate.getField()).dateHistogramInterval(new DateHistogramInterval((dateAggregate.getInterval()))); if (dateAggregate.getFormat() != null) { dateHistogramBuilder.format(dateAggregate.getFormat()); } bucketsAggregation = dateHistogramBuilder; } else if (aggregate instanceof NumericRangeAggregate) { - RangeBuilder rangebuilder = AggregationBuilders.range("buckets").field(aggregate.getField()); + RangeAggregationBuilder rangebuilder = AggregationBuilders.range("buckets").field(aggregate.getField()); for (NumericRange range : ((NumericRangeAggregate) aggregate).getRanges()) { if (range != null) { if (range.getFrom() != null && range.getTo() != null) { @@ -1300,19 +1286,19 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, bucketsAggregation = rangebuilder; } else if (aggregate instanceof DateRangeAggregate) { DateRangeAggregate dateRangeAggregate = (DateRangeAggregate) aggregate; - DateRangeBuilder rangebuilder = AggregationBuilders.dateRange("buckets").field(aggregate.getField()); + DateRangeAggregationBuilder rangebuilder = AggregationBuilders.dateRange("buckets").field(aggregate.getField()); if (dateRangeAggregate.getFormat() != null) { rangebuilder.format(dateRangeAggregate.getFormat()); } for (DateRange range : dateRangeAggregate.getDateRanges()) { if (range != null) { - rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo()); + rangebuilder.addRange(range.getKey(), range.getFrom().toString(), range.getTo().toString()); } } bucketsAggregation = rangebuilder; } else if (aggregate instanceof IpRangeAggregate) { IpRangeAggregate ipRangeAggregate = (IpRangeAggregate) aggregate; - IPv4RangeBuilder rangebuilder = AggregationBuilders.ipRange("buckets").field(aggregate.getField()); + IpRangeAggregationBuilder rangebuilder = AggregationBuilders.ipRange("buckets").field(aggregate.getField()); for (IpRange range : ipRangeAggregate.getRanges()) { if (range != null) { rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo()); @@ -1321,10 +1307,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, bucketsAggregation = rangebuilder; } else { //default - bucketsAggregation = AggregationBuilders.terms("buckets").field(aggregate.getField()).size(Integer.MAX_VALUE); + bucketsAggregation = AggregationBuilders.terms("buckets").field(aggregate.getField()).size(5000); } if (bucketsAggregation != null) { - final MissingBuilder missingBucketsAggregation = AggregationBuilders.missing("missing").field(aggregate.getField()); + final MissingAggregationBuilder missingBucketsAggregation = AggregationBuilders.missing("missing").field(aggregate.getField()); for (AggregationBuilder aggregationBuilder : lastAggregation) { bucketsAggregation.subAggregation(aggregationBuilder); missingBucketsAggregation.subAggregation(aggregationBuilder); @@ -1334,7 +1320,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } if (filter != null) { - AggregationBuilder filterAggregation = AggregationBuilders.filter("filter").filter(conditionESQueryBuilderDispatcher.buildFilter(filter)); + AggregationBuilder filterAggregation = AggregationBuilders.filter("filter", conditionESQueryBuilderDispatcher.buildFilter(filter)); for (AggregationBuilder aggregationBuilder : lastAggregation) { filterAggregation.subAggregation(aggregationBuilder); } @@ -1408,47 +1394,87 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, protected List<ClusterNode> execute(Object... args) { Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, ClusterNode>(); - NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo(NodesInfoRequest.ALL_NODES) - .setSettings(true) - .execute() - .actionGet(); - NodeInfo[] nodesInfoArray = nodesInfoResponse.getNodes(); - for (NodeInfo nodeInfo : nodesInfoArray) { - if (nodeInfo.getSettings().get("node.contextserver.address") != null) { - ClusterNode clusterNode = new ClusterNode(); - clusterNode.setHostName(nodeInfo.getHostname()); - clusterNode.setHostAddress(nodeInfo.getSettings().get("node.contextserver.address")); - clusterNode.setPublicPort(Integer.parseInt(nodeInfo.getSettings().get("node.contextserver.port"))); - clusterNode.setSecureHostAddress(nodeInfo.getSettings().get("node.contextserver.secureAddress")); - clusterNode.setSecurePort(Integer.parseInt(nodeInfo.getSettings().get("node.contextserver.securePort"))); - clusterNode.setMaster(nodeInfo.getNode().isMasterNode()); - clusterNode.setData(nodeInfo.getNode().isDataNode()); - clusterNodes.put(nodeInfo.getNode().getId(), clusterNode); + Set<org.apache.karaf.cellar.core.Node> karafCellarNodes = karafCellarClusterManager.listNodes(); + org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode(); + Map<String, Properties> clusterConfigurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName); + Properties karafCellarClusterNodeConfiguration = clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); + Map<String, String> publicNodeEndpoints = new TreeMap<>(); + Map<String, String> secureNodeEndpoints = new TreeMap<>(); + if (karafCellarClusterNodeConfiguration != null) { + String publicEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + address + ":" + port); + String secureEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, thisKarafNode.getId() + "=" + secureAddress + ":" + securePort); + String[] publicEndpointsArray = publicEndpointsPropValue.split(","); + Set<String> publicEndpoints = new TreeSet<String>(Arrays.asList(publicEndpointsArray)); + for (String endpoint : publicEndpoints) { + String[] endpointParts = endpoint.split("="); + publicNodeEndpoints.put(endpointParts[0], endpointParts[1]); + } + String[] secureEndpointsArray = secureEndpointsPropValue.split(","); + Set<String> secureEndpoints = new TreeSet<String>(Arrays.asList(secureEndpointsArray)); + for (String endpoint : secureEndpoints) { + String[] endpointParts = endpoint.split("="); + secureNodeEndpoints.put(endpointParts[0], endpointParts[1]); } } - - NodesStatsResponse nodesStatsResponse = client.admin().cluster().prepareNodesStats(NodesInfoRequest.ALL_NODES) - .setOs(true) - .setJvm(true) - .setProcess(true) - .execute() - .actionGet(); - NodeStats[] nodeStatsArray = nodesStatsResponse.getNodes(); - for (NodeStats nodeStats : nodeStatsArray) { - ClusterNode clusterNode = clusterNodes.get(nodeStats.getNode().getId()); - if (clusterNode != null) { - // the following may be null in the case where Sigar didn't initialize properly, for example - // because the native libraries were not installed or if we redeployed the OSGi bundle in which - // case Sigar cannot initialize properly since it tries to reload the native libraries, generates - // an error and doesn't initialize properly. - if (nodeStats.getProcess() != null && nodeStats.getProcess().getCpu() != null) { - clusterNode.setCpuLoad(nodeStats.getProcess().getCpu().getPercent()); + for (org.apache.karaf.cellar.core.Node karafCellarNode : karafCellarNodes) { + ClusterNode clusterNode = new ClusterNode(); + clusterNode.setHostName(karafCellarNode.getHost()); + String publicEndpoint = publicNodeEndpoints.get(karafCellarNode.getId()); + String[] publicEndpointParts = publicEndpoint.split(":"); + clusterNode.setHostAddress(publicEndpointParts[0]); + clusterNode.setPublicPort(Integer.parseInt(publicEndpointParts[1])); + String secureEndpoint = secureNodeEndpoints.get(karafCellarNode.getId()); + String[] secureEndpointParts = secureEndpoint.split(":"); + clusterNode.setSecureHostAddress(secureEndpointParts[0]); + clusterNode.setSecurePort(Integer.parseInt(secureEndpointParts[1])); + clusterNode.setMaster(false); + clusterNode.setData(false); + try { + // now let's connect to remote JMX service to retrieve information from the runtime and operating system MX beans + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"+karafCellarNode.getHost() + ":"+karafJMXPort+"/karaf-root"); + Map<String,Object> environment=new HashMap<String,Object>(); + if (karafJMXUsername != null && karafJMXPassword != null) { + environment.put(JMXConnector.CREDENTIALS,new String[]{karafJMXUsername,karafJMXPassword}); } - if (nodeStats.getOs() != null) { - clusterNode.setLoadAverage(new double[] { nodeStats.getOs().getLoadAverage() }); - clusterNode.setUptime(nodeStats.getJvm().getUptime().getMillis()); + JMXConnector jmxc = JMXConnectorFactory.connect(url, environment); + MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); + final RuntimeMXBean remoteRuntime = ManagementFactory.newPlatformMXBeanProxy(mbsc, ManagementFactory.RUNTIME_MXBEAN_NAME, RuntimeMXBean.class); + clusterNode.setUptime(remoteRuntime.getUptime()); + ObjectName operatingSystemMXBeanName = new ObjectName(ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME); + Double processCpuLoad = null; + Double systemCpuLoad = null; + try { + processCpuLoad = (Double) mbsc.getAttribute(operatingSystemMXBeanName, "ProcessCpuLoad"); + } catch (MBeanException e) { + e.printStackTrace(); + } catch (AttributeNotFoundException e) { + e.printStackTrace(); + } + try { + systemCpuLoad = (Double) mbsc.getAttribute(operatingSystemMXBeanName, "SystemCpuLoad"); + } catch (MBeanException e) { + e.printStackTrace(); + } catch (AttributeNotFoundException e) { + e.printStackTrace(); } + final OperatingSystemMXBean remoteOperatingSystemMXBean = ManagementFactory.newPlatformMXBeanProxy(mbsc, ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME, OperatingSystemMXBean.class); + clusterNode.setLoadAverage(new double[] { remoteOperatingSystemMXBean.getSystemLoadAverage()}); + if (systemCpuLoad != null) { + clusterNode.setCpuLoad(systemCpuLoad); + } + + } catch (MalformedURLException e) { + logger.error("Error connecting to remote JMX server", e); + } catch (IOException e) { + logger.error("Error retrieving remote JMX data", e); + } catch (MalformedObjectNameException e) { + logger.error("Error retrieving remote JMX data", e); + } catch (InstanceNotFoundException e) { + logger.error("Error retrieving remote JMX data", e); + } catch (ReflectionException e) { + logger.error("Error retrieving remote JMX data", e); } + clusterNodes.put(karafCellarNode.getId(), clusterNode); } return new ArrayList<ClusterNode>(clusterNodes.values()); @@ -1486,7 +1512,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .setFlush(false) .setCompletion(false) .setRefresh(false) - .setSuggest(false) .execute() .actionGet(); @@ -1525,7 +1550,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, final TimeValue keepAlive = TimeValue.timeValueHours(1); SearchResponse response = client.prepareSearch(indexName + "*") - .setSearchType(SearchType.SCAN) .setScroll(keepAlive) .setQuery(query) .setSize(100).execute().actionGet(); @@ -1570,9 +1594,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) - .setSearchType(SearchType.COUNT) + .setSize(0) .setQuery(QueryBuilders.matchAllQuery()); - AggregationBuilder filterAggregation = AggregationBuilders.filter("metrics").filter(conditionESQueryBuilderDispatcher.buildFilter(condition)); + AggregationBuilder filterAggregation = AggregationBuilders.filter("metrics", conditionESQueryBuilderDispatcher.buildFilter(condition)); if (metrics != null) { for (String metric : metrics) { @@ -1638,4 +1662,22 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return defaultValue; } + + /** + * Check if a configuration is allowed. + * + * @param group the cluster group. + * @param category the configuration category constant. + * @param pid the configuration PID. + * @param type the cluster event type. + * @return true if the cluster event type is allowed, false else. + */ + public boolean isClusterConfigPIDAllowed(Group group, String category, String pid, EventType type) { + CellarSupport support = new CellarSupport(); + support.setClusterManager(this.karafCellarClusterManager); + support.setGroupManager(this.karafCellarGroupManager); + support.setConfigurationAdmin(this.osgiConfigurationAdmin); + return support.isAllowed(group, category, pid, type); + } + } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/actionType.json ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/actionType.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/actionType.json index 9470350..fa2f0ea 100644 --- a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/actionType.json +++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/actionType.json @@ -4,9 +4,9 @@ { "all": { "match": "*", - "match_mapping_type": "string", + "match_mapping_type": "text", "mapping": { - "type": "string", + "type": "text", "analyzer": "folding" } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaign.json ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaign.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaign.json index 36788d7..efdcb3c 100644 --- a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaign.json +++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaign.json @@ -4,9 +4,9 @@ { "all": { "match": "*", - "match_mapping_type": "string", + "match_mapping_type": "text", "mapping": { - "type": "string", + "type": "text", "analyzer": "folding" } } @@ -17,8 +17,7 @@ "type": "double" }, "currency": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "startDate": { "type": "date", @@ -29,25 +28,21 @@ "format": "strict_date_optional_time||epoch_millis" }, "itemId": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "itemType": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "primaryGoal": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "timezone": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "metadata": { "properties": { "description": { - "type": "string", + "type": "text", "analyzer": "folding" }, "enabled": { @@ -57,25 +52,22 @@ "type": "boolean" }, "id": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "missingPlugins": { "type": "boolean" }, "name": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "readOnly": { "type": "boolean" }, "scope": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "tags": { - "type": "string", + "type": "text", "analyzer": "folding" } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaignevent.json ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaignevent.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaignevent.json index ccc001e..f682e6a 100644 --- a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaignevent.json +++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/campaignevent.json @@ -4,9 +4,9 @@ { "all": { "match": "*", - "match_mapping_type": "string", + "match_mapping_type": "text", "mapping": { - "type": "string", + "type": "text", "analyzer": "folding" } } @@ -14,36 +14,31 @@ ], "properties": { "campaignId": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "cost": { "type": "double" }, "currency": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "eventDate": { "type": "date", "format": "strict_date_optional_time||epoch_millis" }, "itemId": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "itemType": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "timezone": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "metadata": { "properties": { "description": { - "type": "string", + "type": "text", "analyzer": "folding" }, "enabled": { @@ -53,22 +48,19 @@ "type": "boolean" }, "id": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "missingPlugins": { "type": "boolean" }, "name": { - "type": "string", - "analyzer": "folding" + "type": "keyword" }, "readOnly": { "type": "boolean" }, "scope": { - "type": "string", - "analyzer": "folding" + "type": "keyword" } } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/96a018ec/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/conditionType.json ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/conditionType.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/conditionType.json index 73ecac9..702a9f1 100644 --- a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/conditionType.json +++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/conditionType.json @@ -4,13 +4,73 @@ { "all": { "match": "*", - "match_mapping_type": "string", + "match_mapping_type": "text", "mapping": { - "type": "string", + "type": "text", "analyzer": "folding" } } } - ] + ], + "properties" : { + "conditionEvaluator" : { + "type" : "keyword" + }, + "itemId" : { + "type" : "keyword" + }, + "itemType" : { + "type" : "keyword" + }, + "metadata" : { + "properties" : { + "description" : { + "type" : "text", + "analyzer" : "folding" + }, + "enabled" : { + "type" : "boolean" + }, + "hidden" : { + "type" : "boolean" + }, + "id" : { + "type" : "keyword" + }, + "missingPlugins" : { + "type" : "boolean" + }, + "name" : { + "type" : "keyword" + }, + "readOnly" : { + "type" : "boolean" + }, + "tags" : { + "type" : "text", + "analyzer" : "folding" + } + } + }, + "parameters" : { + "properties" : { + "defaultValue" : { + "type" : "keyword" + }, + "id" : { + "type" : "keyword" + }, + "multivalued" : { + "type" : "boolean" + }, + "type" : { + "type" : "keyword" + } + } + }, + "queryBuilder" : { + "type" : "keyword" + } + } } } \ No newline at end of file
