- DMF-1133 Error when displaying an empty list in MF : fixed issue for real this time :) - Some configuration file cleanup - Changed inactive user purging from 30 days to 180 days
Signed-off-by: Serge Huber <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/a5b7b156 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/a5b7b156 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/a5b7b156 Branch: refs/heads/master Commit: a5b7b15650afd5b9086eedfce8fa406c8095d580 Parents: b812349 Author: Serge Huber <[email protected]> Authored: Fri Jan 6 11:15:28 2017 +0100 Committer: Serge Huber <[email protected]> Committed: Fri Jan 6 11:15:28 2017 +0100 ---------------------------------------------------------------------- README.md | 56 +-- .../unomi/api/services/ClusterService.java | 1 - kar/src/main/feature/feature.xml | 4 +- package/pom.xml | 13 +- persistence-elasticsearch/core/pom.xml | 20 - .../ElasticSearchPersistenceServiceImpl.java | 397 +++++-------------- .../resources/OSGI-INF/blueprint/blueprint.xml | 21 +- .../core/src/main/resources/elasticsearch.yml | 110 ----- .../core/src/main/resources/hazelcast.xml | 219 ---------- ...g.apache.unomi.persistence.elasticsearch.cfg | 10 +- .../persistence/spi/PersistenceService.java | 8 + pom.xml | 2 - services/pom.xml | 22 + .../services/services/ClusterServiceImpl.java | 304 ++++++++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 45 ++- services/src/main/resources/hazelcast.xml | 219 ++++++++++ .../main/resources/org.apache.unomi.cluster.cfg | 20 + .../resources/org.apache.unomi.services.cfg | 2 +- src/site/markdown/clustering.md | 32 +- src/site/markdown/configuration.md | 27 +- 20 files changed, 739 insertions(+), 793 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index b73efa4..fd6e1d7 100644 --- a/README.md +++ b/README.md @@ -116,16 +116,9 @@ with the following contents: cluster.name=contextElasticSearch index.name=context - elasticSearchConfig=file:${karaf.etc}/elasticsearch.yml And replace the cluster.name parameter here by your cluster name. - -You can also put an elasticsearch configuration file in $MY_KARAF_HOME/etc/elasticsearch.yml , -and put any standard Elasticsearch configuration options in this last file. - -If you want your context server to be a client only on a cluster of elasticsearch nodes, just set the node.data property -to false. - + Secured events configuration --------------------------- @@ -260,23 +253,16 @@ servers on the same network, and enable the discovery protocol in $MY_KARAF_HOME All nodes on the same network, sharing the same cluster name will be part of the same cluster. -###Recommended configurations - -It is recommended to have one node dedicated to the context server, where the other nodes take care of the -Elasticsearch persistence. The node dedicated to the context server will have node.data set to false. - #### 2 nodes configuration One node dedicated to context server, 1 node for elasticsearch storage. Node A : - node.data=true numberOfReplicas=0 monthlyIndex.numberOfReplicas=0 Node B : - node.data=false numberOfReplicas=0 monthlyIndex.numberOfReplicas=0 @@ -285,39 +271,19 @@ One node dedicated to context server, 2 nodes for elasticsearch storage with fau Node A : - node.data=false numberOfReplicas=1 monthlyIndex.numberOfReplicas=1 Node B : - node.data=true numberOfReplicas=1 monthlyIndex.numberOfReplicas=1 Node C : - node.data=true numberOfReplicas=1 monthlyIndex.numberOfReplicas=1 -### Specific configuration -If multicast is not allowed on your network, you'll need to switch to unicast protocol and manually configure the server IPs. This can be -done by disabling the elasticsearch automatic discovery in $MY_KARAF_HOME/etc/org.apache.unomi.persistence.elasticsearch.cfg : - - discovery.zen.ping.multicast.enabled=false - - -And then set the property discovery.zen.ping.unicast.hosts in $MY_KARAF_HOME/etc/elasticsearch.yml files : - - - discovery.zen.ping.unicast.hosts: [â192.168.0.1:9300', â192.168.0.2:9300'] - - -More information and configuration options can be found at : -[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html) - - JDK Selection on Mac OS X ------------------------- @@ -415,20 +381,6 @@ Of course any ports listed here are the default ports configured in each server, Step 2 : Adjust the Context Server IP filtering -By default the Context Server limits to connections to port 9200 and 9300 to the following IP ranges - - - localhost - - 127.0.0.1 - - ::1 - - the current subnet (i.e., 192.168.1.0-192.168.1.255) - -(this is done using a custom plugin for Elasticsearch, that you may find here : -https://git-wip-us.apache.org/repos/asf/incubator-unomi/context-server/persistence-elasticsearch/plugins/security) - -You can adjust this setting by using the following setting in the $MY_KARAF_HOME/etc/elasticsearch.yml file : - - security.ipranges: localhost,127.0.0.1,::1,10.0.1.0-10.0.1.255 - Step 3 : Follow industry recommended best practices for securing Elasticsearch You may find more valuable recommendations here : @@ -463,9 +415,3 @@ To upload the site to the Apache website, simply run after the above command has This operation takes a little bit of time, so don't interrupt it even if you're waiting for a while for it to complete (usually takes about 16 minutes !) - -Todo ----- - -- Look at possible integration with newsletter management systems such as MailChimp, for example to synchronize profile data with collected info. -- Integrate with machine learning implementations such as Prediction.io or Apache Mahout http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/api/src/main/java/org/apache/unomi/api/services/ClusterService.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java index 37fcd37..b851b78 100644 --- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java +++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java @@ -25,7 +25,6 @@ import java.util.List; /** * A service to access information about the context server's cluster. * - * TODO: rename to something less specific like ContextRuntimeService? */ public interface ClusterService { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/kar/src/main/feature/feature.xml ---------------------------------------------------------------------- diff --git a/kar/src/main/feature/feature.xml b/kar/src/main/feature/feature.xml index 8e6ddb2..fa3ef1f 100644 --- a/kar/src/main/feature/feature.xml +++ b/kar/src/main/feature/feature.xml @@ -29,10 +29,10 @@ <configfile finalname="/etc/org.apache.unomi.plugins.request.cfg">mvn:org.apache.unomi/unomi-plugins-request/${project.version}/cfg/requestcfg</configfile> <configfile finalname="/etc/org.apache.unomi.services.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/servicescfg</configfile> <configfile finalname="/etc/org.apache.unomi.thirdparty.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/thirdpartycfg</configfile> + <configfile finalname="/etc/org.apache.unomi.cluster.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/clustercfg</configfile> + <configfile finalname="/etc/hazelcast.xml">mvn:org.apache.unomi/unomi-services/${project.version}/xml/hazelcastconfig</configfile> <configfile finalname="/etc/org.apache.unomi.privacy.cfg">mvn:org.apache.unomi/cxs-privacy-extension-services/${project.version}/cfg/privacycfg</configfile> <configfile finalname="/etc/org.apache.unomi.geonames.cfg">mvn:org.apache.unomi/cxs-geonames-services/${project.version}/cfg/geonamescfg</configfile> - <configfile finalname="/etc/elasticsearch.yml">mvn:org.apache.unomi/unomi-persistence-elasticsearch-core/${project.version}/yml/elasticsearchconfig</configfile> - <configfile finalname="/etc/hazelcast.xml">mvn:org.apache.unomi/unomi-persistence-elasticsearch-core/${project.version}/xml/hazelcastconfig</configfile> <bundle start-level="75">mvn:commons-io/commons-io/2.4</bundle> <bundle start-level="75">mvn:com.fasterxml.jackson.core/jackson-core/${version.jackson.core}</bundle> <bundle start-level="75">mvn:com.fasterxml.jackson.core/jackson-databind/${version.jackson.core}</bundle> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/package/pom.xml ---------------------------------------------------------------------- diff --git a/package/pom.xml b/package/pom.xml index 104723e..fd630fd 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -215,18 +215,7 @@ </artifactItem> <artifactItem> <groupId>org.apache.unomi</groupId> - <artifactId>unomi-persistence-elasticsearch-core</artifactId> - <version>${project.version}</version> - <classifier>elasticsearchconfig</classifier> - <type>yml</type> - <outputDirectory> - ${project.build.directory}/assembly/etc - </outputDirectory> - <destFileName>elasticsearch.yml</destFileName> - </artifactItem> - <artifactItem> - <groupId>org.apache.unomi</groupId> - <artifactId>unomi-persistence-elasticsearch-core</artifactId> + <artifactId>unomi-services</artifactId> <version>${project.version}</version> <classifier>hazelcastconfig</classifier> <type>xml</type> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/pom.xml ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index f4b2a24..ad96cfb 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -54,16 +54,6 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>org.apache.karaf.cellar.core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>org.apache.karaf.cellar.config</artifactId> - <scope>provided</scope> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>20.0</version> @@ -255,16 +245,6 @@ <type>cfg</type> <classifier>elasticsearchcfg</classifier> </artifact> - <artifact> - <file>src/main/resources/elasticsearch.yml</file> - <type>yml</type> - <classifier>elasticsearchconfig</classifier> - </artifact> - <artifact> - <file>src/main/resources/hazelcast.xml</file> - <type>xml</type> - <classifier>hazelcastconfig</classifier> - </artifact> </artifacts> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index f98854b..7abcbd5 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -18,13 +18,6 @@ package org.apache.unomi.persistence.elasticsearch; import org.apache.commons.lang3.StringUtils; -import org.apache.karaf.cellar.config.ClusterConfigurationEvent; -import org.apache.karaf.cellar.config.Constants; -import org.apache.karaf.cellar.core.*; -import org.apache.karaf.cellar.core.control.SwitchStatus; -import org.apache.karaf.cellar.core.event.EventProducer; -import org.apache.karaf.cellar.core.event.EventType; -import org.apache.unomi.api.ClusterNode; import org.apache.unomi.api.Item; import org.apache.unomi.api.PartialList; import org.apache.unomi.api.TimestampedItem; @@ -32,11 +25,12 @@ import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.query.DateRange; import org.apache.unomi.api.query.IpRange; import org.apache.unomi.api.query.NumericRange; -import org.apache.unomi.api.services.ClusterService; import org.apache.unomi.persistence.elasticsearch.conditions.*; import org.apache.unomi.persistence.spi.CustomObjectMapper; import org.apache.unomi.persistence.spi.PersistenceService; import org.apache.unomi.persistence.spi.aggregate.*; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; @@ -83,22 +77,13 @@ import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.osgi.framework.*; -import org.osgi.service.cm.ConfigurationAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.*; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.lang.management.ManagementFactory; -import java.lang.management.OperatingSystemMXBean; -import java.lang.management.RuntimeMXBean; import java.net.InetAddress; -import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; import java.text.ParseException; @@ -108,7 +93,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @SuppressWarnings("rawtypes") -public class ElasticSearchPersistenceServiceImpl implements PersistenceService, ClusterService, SynchronousBundleListener { +public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); @@ -116,18 +101,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public static final String CONTEXTSERVER_PORT = "contextserver.port"; public static final String CONTEXTSERVER_SECURE_ADDRESS = "contextserver.secureAddress"; public static final String CONTEXTSERVER_SECURE_PORT = "contextserver.securePort"; + public static final String NUMBER_OF_SHARDS = "number_of_shards"; public static final String NUMBER_OF_REPLICAS = "number_of_replicas"; public static final String CLUSTER_NAME = "cluster.name"; + public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name"; public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests"; public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions"; public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize"; public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval"; public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy"; - public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION = "org.apache.unoni.nodes"; - public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS = "publicEndpoints"; - public static final String KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS = "secureEndpoints"; private Client client; private BulkProcessor bulkProcessor; @@ -137,19 +121,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String monthlyIndexNumberOfReplicas; private String numberOfShards; private String numberOfReplicas; - private String elasticSearchConfig = null; private BundleContext bundleContext; private Map<String, String> mappings = new HashMap<String, String>(); private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher; private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher; - private ClusterManager karafCellarClusterManager; - private EventProducer karafCellarEventProducer; - private GroupManager karafCellarGroupManager; - private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME; - private ConfigurationAdmin osgiConfigurationAdmin; - private String karafJMXUsername = "karaf"; - private String karafJMXPassword = "karaf"; - private int karafJMXPort = 1099; private Map<String,String> indexNames; private List<String> itemsMonthlyIndexed; @@ -171,6 +146,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String bulkProcessorFlushInterval = "5s"; private String bulkProcessorBackoffPolicy = "exponential"; + private String minimalElasticSearchVersion = "5.0.0"; + private String maximalElasticSearchVersion = "5.2.0"; + private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>(); public void setBundleContext(BundleContext bundleContext) { @@ -233,10 +211,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.routingByType = routingByType; } - public void setElasticSearchConfig(String elasticSearchConfig) { - this.elasticSearchConfig = elasticSearchConfig; - } - public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) { this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher; } @@ -269,45 +243,21 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy; } - public void setKarafCellarClusterManager(ClusterManager karafCellarClusterManager) { - this.karafCellarClusterManager = karafCellarClusterManager; + public void setMinimalElasticSearchVersion(String minimalElasticSearchVersion) { + this.minimalElasticSearchVersion = minimalElasticSearchVersion; } - public void setKarafCellarEventProducer(EventProducer karafCellarEventProducer) { - this.karafCellarEventProducer = karafCellarEventProducer; + public void setMaximalElasticSearchVersion(String maximalElasticSearchVersion) { + this.maximalElasticSearchVersion = maximalElasticSearchVersion; } - public void setKarafCellarGroupManager(GroupManager karafCellarGroupManager) { - this.karafCellarGroupManager = karafCellarGroupManager; - } - - public void setKarafCellarGroupName(String karafCellarGroupName) { - this.karafCellarGroupName = karafCellarGroupName; - } - - public void setOsgiConfigurationAdmin(ConfigurationAdmin osgiConfigurationAdmin) { - this.osgiConfigurationAdmin = osgiConfigurationAdmin; - } - - public void setKarafJMXUsername(String karafJMXUsername) { - this.karafJMXUsername = karafJMXUsername; - } - - public void setKarafJMXPassword(String karafJMXPassword) { - this.karafJMXPassword = karafJMXPassword; - } - - public void setKarafJMXPort(int karafJMXPort) { - this.karafJMXPort = karafJMXPort; - } - - public void start() { + public void start() throws Exception { loadPredefinedMappings(bundleContext, false); // on startup new InClassLoaderExecute<Object>() { - public Object execute(Object... args) { + public Object execute(Object... args) throws Exception { logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "..."); address = System.getProperty(CONTEXTSERVER_ADDRESS, address); @@ -315,51 +265,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, secureAddress = System.getProperty(CONTEXTSERVER_SECURE_ADDRESS, secureAddress); securePort = System.getProperty(CONTEXTSERVER_SECURE_PORT, securePort); - if (karafCellarEventProducer != null && karafCellarClusterManager != null) { - - boolean setupConfigOk = true; - Group group = karafCellarGroupManager.findGroupByName(karafCellarGroupName); - if (setupConfigOk && group == null) { - logger.error("Cluster group " + karafCellarGroupName + " doesn't exist"); - setupConfigOk = false; - } - - // check if the producer is ON - if (setupConfigOk && karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) { - logger.error("Cluster event producer is OFF"); - setupConfigOk = false; - } - - // check if the config pid is allowed - if (setupConfigOk && !isClusterConfigPIDAllowed(group, Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, EventType.OUTBOUND)) { - logger.error("Configuration PID " + KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster group " + karafCellarGroupName); - setupConfigOk = false; - } - - if (setupConfigOk) { - Map<String, Properties> configurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName); - org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode(); - Properties karafCellarClusterNodeConfiguration = configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); - if (karafCellarClusterNodeConfiguration == null) { - karafCellarClusterNodeConfiguration = new Properties(); - } - String publicEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + address + ":" + port); - String secureEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, thisKarafNode.getId() + "=" + secureAddress + ":" + securePort); - String[] publicEndpointsArray = publicEndpointsPropValue.split(","); - Set<String> publicEndpoints = new TreeSet<String>(Arrays.asList(publicEndpointsArray)); - String[] secureEndpointsArray = secureEndpointsPropValue.split(","); - Set<String> secureEndpoints = new TreeSet<String>(Arrays.asList(secureEndpointsArray)); - publicEndpoints.add(thisKarafNode.getId() + "=" + address + ":" + port); - secureEndpoints.add(thisKarafNode.getId() + "=" + secureAddress + ":" + securePort); - karafCellarClusterNodeConfiguration.setProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, StringUtils.join(publicEndpoints, ",")); - karafCellarClusterNodeConfiguration.setProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, StringUtils.join(secureEndpoints, ",")); - configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, karafCellarClusterNodeConfiguration); - ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); - clusterConfigurationEvent.setSourceGroup(group); - karafCellarEventProducer.produce(clusterConfigurationEvent); - } - } - bulkProcessorName = System.getProperty(BULK_PROCESSOR_NAME, bulkProcessorName); bulkProcessorConcurrentRequests = System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, bulkProcessorConcurrentRequests); bulkProcessorBulkActions = System.getProperty(BULK_PROCESSOR_BULK_ACTIONS, bulkProcessorBulkActions); @@ -373,7 +278,29 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, client = new PreBuiltTransportClient(transportSettings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(address), 9300)); } catch (UnknownHostException e) { - logger.error("Error resolving address " + address + " ElasticSearch transport client not connected, using internal client instead", e); + String message = "Error resolving address " + address + " ElasticSearch transport client not connected"; + throw new Exception(message, e); + } + + // let's now check the versions of all the nodes in the cluster, to make sure they are as expected. + try { + NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo() + .all().execute().get(); + + org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion); + org.elasticsearch.Version maximalVersion = org.elasticsearch.Version.fromString(maximalElasticSearchVersion); + for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { + org.elasticsearch.Version version = nodeInfo.getVersion(); + if (version.before(minimalVersion) || + version.equals(maximalVersion) || + version.after(maximalVersion)) { + throw new Exception("ElasticSearch version on node " + nodeInfo.getHostname() + " is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !"); + } + } + } catch (InterruptedException e) { + throw new Exception("Error checking ElasticSearch versions", e); + } catch (ExecutionException e) { + throw new Exception("Error checking ElasticSearch versions", e); } // @todo is there a better way to detect index existence than to wait for it to startup ? @@ -433,7 +360,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.info("Cluster status is GREEN"); - return null; + return true; } }.executeInClassLoader(); @@ -467,19 +394,19 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private void refreshExistingIndexNames() { new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) { + protected Boolean execute(Object... args) throws Exception { try { logger.info("Refreshing existing indices list..."); IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get(); existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet()); } catch (InterruptedException e) { - logger.error("Error retrieving indices stats", e); + throw new Exception("Error retrieving indices stats", e); } catch (ExecutionException e) { - logger.error("Error retrieving indices stats", e); + throw new Exception("Error retrieving indices stats", e); } return true; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } public BulkProcessor getBulkProcessor() { @@ -581,7 +508,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return null; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); if (timer != null) { timer.cancel(); @@ -706,7 +633,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> T load(final String itemId, final Date dateHint, final Class<T> clazz) { return new InClassLoaderExecute<T>() { - protected T execute(Object... args) { + protected T execute(Object... args) throws Exception { try { String itemType = (String) clazz.getField("ITEM_TYPE").get(null); @@ -733,15 +660,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } catch (IndexNotFoundException e) { - logger.debug("No index found for itemType=" + clazz.getName() + " itemId=" + itemId, e); + throw new Exception("No index found for itemType=" + clazz.getName() + " itemId=" + itemId, e); } catch (IllegalAccessException e) { - logger.error("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, e); + throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, e); } catch (Exception t) { - logger.error("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, t); + throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, t); } - return null; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @@ -754,7 +680,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean save(final Item item, final boolean useBatching) { return new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) { + protected Boolean execute(Object... args) throws Exception { try { String source = CustomObjectMapper.getObjectMapper().writeValueAsString(item); String itemType = item.getItemType(); @@ -794,11 +720,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return true; } catch (IOException e) { - logger.error("Error saving item " + item, e); + throw new Exception("Error saving item " + item, e); } - return false; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @@ -810,7 +735,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) { return new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) { + protected Boolean execute(Object... args) throws Exception { try { String itemType = (String) clazz.getField("ITEM_TYPE").get(null); @@ -827,21 +752,20 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return true; } catch (IndexNotFoundException e) { - logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); + throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); } catch (NoSuchFieldException e) { - logger.error("Error updating item " + itemId, e); + throw new Exception("Error updating item " + itemId, e); } catch (IllegalAccessException e) { - logger.error("Error updating item " + itemId, e); + throw new Exception("Error updating item " + itemId, e); } - return false; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @Override public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) { return new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) { + protected Boolean execute(Object... args) throws Exception { try { String itemType = (String) clazz.getField("ITEM_TYPE").get(null); @@ -859,21 +783,20 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return true; } catch (IndexNotFoundException e) { - logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); + throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); } catch (NoSuchFieldException e) { - logger.error("Error updating item " + itemId, e); + throw new Exception("Error updating item " + itemId, e); } catch (IllegalAccessException e) { - logger.error("Error updating item " + itemId, e); + throw new Exception("Error updating item " + itemId, e); } - return false; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @Override public <T extends Item> boolean remove(final String itemId, final Class<T> clazz) { return new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) { + protected Boolean execute(Object... args) throws Exception { //Index the query = register it in the percolator try { String itemType = (String) clazz.getField("ITEM_TYPE").get(null); @@ -882,16 +805,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .execute().actionGet(); return true; } catch (Exception e) { - logger.error("Cannot remove", e); + throw new Exception("Cannot remove", e); } - return false; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) { return new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) { + protected Boolean execute(Object... args) throws Exception { try { String itemType = (String) clazz.getField("ITEM_TYPE").get(null); @@ -932,11 +854,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return true; } catch (Exception e) { - logger.error("Cannot remove by query", e); + throw new Exception("Cannot remove by query", e); } - return false; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } public boolean createIndex(final String indexName) { @@ -956,7 +877,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return !indexExists; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } public boolean removeIndex(final String indexName) { @@ -970,7 +891,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return indexExists; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } private void internalCreateIndex(String indexName, Map<String,String> mappings) { @@ -1029,7 +950,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public Map<String, Map<String, Object>> getPropertiesMapping(final String itemType) { return new InClassLoaderExecute<Map<String, Map<String, Object>>>() { @SuppressWarnings("unchecked") - protected Map<String, Map<String, Object>> execute(Object... args) { + protected Map<String, Map<String, Object>> execute(Object... args) throws Exception { GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings().setTypes(itemType).execute().actionGet(); ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings(); Map<String, Map<String, Object>> propertyMap = new HashMap<>(); @@ -1054,11 +975,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } catch (IOException e) { - logger.error("Cannot get mapping", e); + throw new Exception("Cannot get mapping", e); } return propertyMap; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } public Map<String, Object> getPropertyMapping(String property, String itemType) { @@ -1089,6 +1010,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String getPropertyNameWithData(String name, String itemType) { Map<String,Object> propertyMapping = getPropertyMapping(name,itemType); + if (propertyMapping == null) { + return null; + } if (propertyMapping != null && "text".equals(propertyMapping.get("type")) && propertyMapping.containsKey("fields") @@ -1100,7 +1024,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean saveQuery(final String queryName, final String query) { return new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) { + protected Boolean execute(Object... args) throws Exception { //Index the query = register it in the percolator try { logger.info("Saving query : " + queryName); @@ -1110,11 +1034,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .execute().actionGet(); return true; } catch (Exception e) { - logger.error("Cannot save query", e); + throw new Exception("Cannot save query", e); } - return false; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @Override @@ -1129,7 +1052,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public boolean removeQuery(final String queryName) { return new InClassLoaderExecute<Boolean>() { - protected Boolean execute(Object... args) { + protected Boolean execute(Object... args) throws Exception { //Index the query = register it in the percolator try { client.prepareDelete(indexName, ".percolator", queryName) @@ -1137,11 +1060,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .execute().actionGet(); return true; } catch (Exception e) { - logger.error("Cannot delete query", e); + throw new Exception("Cannot delete query", e); } - return false; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @Override @@ -1238,14 +1160,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, .actionGet(); return response.getHits().getTotalHits(); } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) { return new InClassLoaderExecute<PartialList<T>>() { @Override - protected PartialList<T> execute(Object... args) { + protected PartialList<T> execute(Object... args) throws Exception { List<T> results = new ArrayList<T>(); String scrollIdentifier = null; long totalHits = 0; @@ -1348,7 +1270,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } } catch (Exception t) { - logger.error("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t); + throw new Exception("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t); } PartialList<T> result = new PartialList<T>(results, offset, size, totalHits); @@ -1358,7 +1280,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return result; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @Override @@ -1366,7 +1288,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return new InClassLoaderExecute<PartialList<T>>() { @Override - protected PartialList<T> execute(Object... args) { + protected PartialList<T> execute(Object... args) throws Exception { List<T> results = new ArrayList<T>(); long totalHits = 0; try { @@ -1391,11 +1313,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return result; } catch (Exception t) { - logger.error("Error continuing scrolling query for itemType=" + clazz.getName() + " scrollIdentifier=" + scrollIdentifier + " scrollTimeValidity=" + scrollTimeValidity, t); + throw new Exception("Error continuing scrolling query for itemType=" + clazz.getName() + " scrollIdentifier=" + scrollIdentifier + " scrollTimeValidity=" + scrollTimeValidity, t); } - return null; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @Override @@ -1520,7 +1441,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return results; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } private <T extends Item> String getItemType(Class<T> clazz) { @@ -1544,105 +1465,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } - @Override - public List<ClusterNode> getClusterNodes() { - return new InClassLoaderExecute<List<ClusterNode>>() { - - @Override - protected List<ClusterNode> execute(Object... args) { - Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, ClusterNode>(); - - Set<org.apache.karaf.cellar.core.Node> karafCellarNodes = karafCellarClusterManager.listNodes(); - org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode(); - Map<String, Properties> clusterConfigurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName); - Properties karafCellarClusterNodeConfiguration = clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); - Map<String, String> publicNodeEndpoints = new TreeMap<>(); - Map<String, String> secureNodeEndpoints = new TreeMap<>(); - if (karafCellarClusterNodeConfiguration != null) { - String publicEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + address + ":" + port); - String secureEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, thisKarafNode.getId() + "=" + secureAddress + ":" + securePort); - String[] publicEndpointsArray = publicEndpointsPropValue.split(","); - Set<String> publicEndpoints = new TreeSet<String>(Arrays.asList(publicEndpointsArray)); - for (String endpoint : publicEndpoints) { - String[] endpointParts = endpoint.split("="); - publicNodeEndpoints.put(endpointParts[0], endpointParts[1]); - } - String[] secureEndpointsArray = secureEndpointsPropValue.split(","); - Set<String> secureEndpoints = new TreeSet<String>(Arrays.asList(secureEndpointsArray)); - for (String endpoint : secureEndpoints) { - String[] endpointParts = endpoint.split("="); - secureNodeEndpoints.put(endpointParts[0], endpointParts[1]); - } - } - for (org.apache.karaf.cellar.core.Node karafCellarNode : karafCellarNodes) { - ClusterNode clusterNode = new ClusterNode(); - clusterNode.setHostName(karafCellarNode.getHost()); - String publicEndpoint = publicNodeEndpoints.get(karafCellarNode.getId()); - if (publicEndpoint != null) { - String[] publicEndpointParts = publicEndpoint.split(":"); - clusterNode.setHostAddress(publicEndpointParts[0]); - clusterNode.setPublicPort(Integer.parseInt(publicEndpointParts[1])); - } - String secureEndpoint = secureNodeEndpoints.get(karafCellarNode.getId()); - if (secureEndpoint != null) { - String[] secureEndpointParts = secureEndpoint.split(":"); - clusterNode.setSecureHostAddress(secureEndpointParts[0]); - clusterNode.setSecurePort(Integer.parseInt(secureEndpointParts[1])); - clusterNode.setMaster(false); - clusterNode.setData(false); - } - try { - // now let's connect to remote JMX service to retrieve information from the runtime and operating system MX beans - JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"+karafCellarNode.getHost() + ":"+karafJMXPort+"/karaf-root"); - Map<String,Object> environment=new HashMap<String,Object>(); - if (karafJMXUsername != null && karafJMXPassword != null) { - environment.put(JMXConnector.CREDENTIALS,new String[]{karafJMXUsername,karafJMXPassword}); - } - JMXConnector jmxc = JMXConnectorFactory.connect(url, environment); - MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); - final RuntimeMXBean remoteRuntime = ManagementFactory.newPlatformMXBeanProxy(mbsc, ManagementFactory.RUNTIME_MXBEAN_NAME, RuntimeMXBean.class); - clusterNode.setUptime(remoteRuntime.getUptime()); - ObjectName operatingSystemMXBeanName = new ObjectName(ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME); - Double processCpuLoad = null; - Double systemCpuLoad = null; - try { - processCpuLoad = (Double) mbsc.getAttribute(operatingSystemMXBeanName, "ProcessCpuLoad"); - } catch (MBeanException e) { - e.printStackTrace(); - } catch (AttributeNotFoundException e) { - e.printStackTrace(); - } - try { - systemCpuLoad = (Double) mbsc.getAttribute(operatingSystemMXBeanName, "SystemCpuLoad"); - } catch (MBeanException e) { - e.printStackTrace(); - } catch (AttributeNotFoundException e) { - e.printStackTrace(); - } - final OperatingSystemMXBean remoteOperatingSystemMXBean = ManagementFactory.newPlatformMXBeanProxy(mbsc, ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME, OperatingSystemMXBean.class); - clusterNode.setLoadAverage(new double[] { remoteOperatingSystemMXBean.getSystemLoadAverage()}); - if (systemCpuLoad != null) { - clusterNode.setCpuLoad(systemCpuLoad); - } - - } catch (MalformedURLException e) { - logger.error("Error connecting to remote JMX server", e); - } catch (IOException e) { - logger.error("Error retrieving remote JMX data", e); - } catch (MalformedObjectNameException e) { - logger.error("Error retrieving remote JMX data", e); - } catch (InstanceNotFoundException e) { - logger.error("Error retrieving remote JMX data", e); - } catch (ReflectionException e) { - logger.error("Error retrieving remote JMX data", e); - } - clusterNodes.put(karafCellarNode.getId(), clusterNode); - } - - return new ArrayList<ClusterNode>(clusterNodes.values()); - } - }.executeInClassLoader(); - } @Override public void refresh() { @@ -1654,7 +1476,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, client.admin().indices().refresh(Requests.refreshRequest()).actionGet(); return true; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @@ -1663,7 +1485,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public void purge(final Date date) { new InClassLoaderExecute<Object>() { @Override - protected Object execute(Object... args) { + protected Object execute(Object... args) throws Exception { IndicesStatsResponse statsResponse = client.admin().indices().prepareStats(indexName + "-*") .setIndexing(false) .setGet(false) @@ -1689,7 +1511,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, toDelete.add(currentIndexName); } } catch (ParseException e) { - logger.error("Cannot parse index name " + currentIndexName, e); + throw new Exception("Cannot parse index name " + currentIndexName, e); } } } @@ -1698,7 +1520,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return null; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @Override @@ -1743,7 +1565,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return null; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } @Override @@ -1794,7 +1616,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } return results; } - }.executeInClassLoader(); + }.catchingExecuteInClassLoader(true); } private String getIndexNameForQuery(String itemType) { @@ -1804,9 +1626,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public abstract static class InClassLoaderExecute<T> { - protected abstract T execute(Object... args); + protected abstract T execute(Object... args) throws Exception; - public T executeInClassLoader(Object... args) { + public T executeInClassLoader(Object... args) throws Exception { ClassLoader tccl = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); @@ -1815,6 +1637,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, Thread.currentThread().setContextClassLoader(tccl); } } + + public T catchingExecuteInClassLoader( boolean logError, Object... args) { + try { + return executeInClassLoader(args); + } catch (Exception e) { + logger.error("Error while executing in class loader", e); + } + return null; + } } private String getConfig(Map<String,String> settings, String key, @@ -1825,21 +1656,5 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, return defaultValue; } - /** - * Check if a configuration is allowed. - * - * @param group the cluster group. - * @param category the configuration category constant. - * @param pid the configuration PID. - * @param type the cluster event type. - * @return true if the cluster event type is allowed, false else. - */ - public boolean isClusterConfigPIDAllowed(Group group, String category, String pid, EventType type) { - CellarSupport support = new CellarSupport(); - support.setClusterManager(this.karafCellarClusterManager); - support.setGroupManager(this.karafCellarGroupManager); - support.setConfigurationAdmin(this.osgiConfigurationAdmin); - return support.isAllowed(group, category, pid, type); - } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index c929009..1135ef5 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -32,7 +32,6 @@ <cm:default-properties> <cm:property name="cluster.name" value="contextElasticSearch"/> <cm:property name="index.name" value="context"/> - <cm:property name="elasticSearchConfig" value="file:${karaf.etc}/elasticsearch.yml"/> <cm:property name="numberOfShards" value="5"/> <cm:property name="numberOfReplicas" value="0"/> <cm:property name="monthlyIndex.numberOfShards" value="3"/> @@ -46,10 +45,9 @@ <cm:property name="bulkProcessor.flushInterval" value="5s" /> <cm:property name="bulkProcessor.backoffPolicy" value="exponential" /> - <cm:property name="cluster.group" value="default" /> - <cm:property name="cluster.jmxUsername" value="karaf" /> - <cm:property name="cluster.jmxPassword" value="karaf" /> - <cm:property name="cluster.jmxPort" value="1099" /> + <cm:property name="minimalElasticSearchVersion" value="5.0.0" /> + <cm:property name="maximalElasticSearchVersion" value="5.2.0" /> + </cm:default-properties> </cm:property-placeholder> @@ -71,7 +69,6 @@ <service id="elasticSearchPersistenceService" ref="elasticSearchPersistenceServiceImpl"> <interfaces> <value>org.apache.unomi.persistence.spi.PersistenceService</value> - <value>org.apache.unomi.api.services.ClusterService</value> </interfaces> </service> @@ -101,7 +98,6 @@ <property name="secureAddress" value="${web.contextserver.secureAddress}"/> <property name="securePort" value="${web.contextserver.securePort}"/> <property name="defaultQueryLimit" value="${es.defaultQueryLimit}"/> - <property name="elasticSearchConfig" value="${es.elasticSearchConfig}"/> <property name="itemsMonthlyIndexed"> <list> <value>event</value> @@ -123,14 +119,9 @@ <property name="bulkProcessorBulkSize" value="${es.bulkProcessor.bulkSize}" /> <property name="bulkProcessorFlushInterval" value="${es.bulkProcessor.flushInterval}" /> <property name="bulkProcessorBackoffPolicy" value="${es.bulkProcessor.backoffPolicy}" /> - <property name="karafCellarClusterManager" ref="karafCellarClusterManager" /> - <property name="karafCellarEventProducer" ref="karafCellarEventProducer" /> - <property name="karafCellarGroupManager" ref="karafCellarGroupManager" /> - <property name="karafCellarGroupName" value="${es.cluster.group}" /> - <property name="osgiConfigurationAdmin" ref="osgiConfigurationAdmin" /> - <property name="karafJMXUsername" value="${es.cluster.jmxUsername}" /> - <property name="karafJMXPassword" value="${es.cluster.jmxPassword}" /> - <property name="karafJMXPort" value="${es.cluster.jmxPort}" /> + + <property name="minimalElasticSearchVersion" value="${es.minimalElasticSearchVersion}" /> + <property name="maximalElasticSearchVersion" value="${es.maximalElasticSearchVersion}" /> </bean> <!-- We use a listener here because using the list directly for listening to proxies coming from the same bundle didn't seem to work --> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/resources/elasticsearch.yml ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/elasticsearch.yml b/persistence-elasticsearch/core/src/main/resources/elasticsearch.yml deleted file mode 100644 index fe0e52f..0000000 --- a/persistence-elasticsearch/core/src/main/resources/elasticsearch.yml +++ /dev/null @@ -1,110 +0,0 @@ -# ======================== Elasticsearch Configuration ========================= -# -# NOTE: Elasticsearch comes with reasonable defaults for most settings. -# Before you set out to tweak and tune the configuration, make sure you -# understand what are you trying to accomplish and the consequences. -# -# The primary way of configuring a node is via this file. This template lists -# the most important settings you may want to configure for a production cluster. -# -# Please see the documentation for further information on configuration options: -# <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html> -# -# ---------------------------------- Cluster ----------------------------------- -# -# Use a descriptive name for your cluster: -# -# cluster.name: my-application -# -# ------------------------------------ Node ------------------------------------ -# -# Use a descriptive name for the node: -# -# node.name: node-1 -# -# Add custom attributes to the node: -# -# node.rack: r1 -# -# ----------------------------------- Paths ------------------------------------ -# -# Path to directory where to store the data (separate multiple locations by comma): -# -# path.data: /path/to/data -# -# Path to log files: -# -# path.logs: /path/to/logs -# -# ----------------------------------- Memory ----------------------------------- -# -# Lock the memory on startup: -# -# bootstrap.mlockall: true -# -# Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory -# available on the system and that the owner of the process is allowed to use this limit. -# -# Elasticsearch performs poorly when the system is swapping the memory. -# -# ---------------------------------- Network ----------------------------------- -# -# Set the bind address to a specific IP (IPv4 or IPv6): -# -# network.host: 192.168.0.1 -# -# Set a custom port for HTTP: -# -# http.port: 9200 -# -# For more information, see the documentation at: -# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html> -# -# --------------------------------- Discovery ---------------------------------- -# -# Pass an initial list of hosts to perform discovery when new node is started: -# The default list of hosts is ["127.0.0.1", "[::1]"] -# -# discovery.zen.ping.unicast.hosts: ["host1", "host2"] -# -# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1): -# -# discovery.zen.minimum_master_nodes: 3 -# -# For more information, see the documentation at: -# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html> -# -# ---------------------------------- Gateway ----------------------------------- -# -# Block initial recovery after a full cluster restart until N nodes are started: -# -# gateway.recover_after_nodes: 3 -# -# For more information, see the documentation at: -# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html> -# -# ---------------------------------- Various ----------------------------------- -# -# Disable starting multiple nodes on a single system: -# -# node.max_local_storage_nodes: 1 -# -# Require explicit names when deleting indices: -# -# action.destructive_requires_name: true -# -# --------------------------- Apache Unomi specific --------------------------- -# -threadpool.index.queue_size: 1000 -script.engine.groovy.inline.update: on -index.percolator.allow_unmapped_fields: true - -# Require explicit index creation -action.auto_create_index: false - -# Protect against accidental close/delete operations -# on all indices. You can still close/delete individual -# indices -#action.disable_close_all_indices: true -#action.disable_delete_all_indices: true -#action.disable_shutdown: true http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/resources/hazelcast.xml ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/hazelcast.xml b/persistence-elasticsearch/core/src/main/resources/hazelcast.xml deleted file mode 100644 index 0fc6f5d..0000000 --- a/persistence-elasticsearch/core/src/main/resources/hazelcast.xml +++ /dev/null @@ -1,219 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.2.xsd" - xmlns="http://www.hazelcast.com/schema/config" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <group> - <name>cellar</name> - <password>pass</password> - </group> - <management-center enabled="false">http://localhost:8080/mancenter</management-center> - <network> - <port auto-increment="true" port-count="100">5701</port> - <outbound-ports> - <!-- - Allowed port range when connecting to other nodes. - 0 or * means use system provided port. - --> - <ports>0</ports> - </outbound-ports> - <join> - <multicast enabled="false"> - <multicast-group>224.2.2.3</multicast-group> - <multicast-port>54327</multicast-port> - </multicast> - <tcp-ip enabled="true"> - <interface>127.0.0.1</interface> - </tcp-ip> - <aws enabled="false"> - <access-key>my-access-key</access-key> - <secret-key>my-secret-key</secret-key> - <!--optional, default is us-east-1 --> - <region>us-west-1</region> - <!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property --> - <host-header>ec2.amazonaws.com</host-header> - <!-- optional, only instances belonging to this group will be discovered, default will try all running instances --> - <security-group-name>hazelcast-sg</security-group-name> - <tag-key>type</tag-key> - <tag-value>hz-nodes</tag-value> - </aws> - </join> - <interfaces enabled="false"> - <interface>10.10.1.*</interface> - </interfaces> - <ssl enabled="false"/> - <socket-interceptor enabled="false"/> - <symmetric-encryption enabled="false"> - <!-- - encryption algorithm such as - DES/ECB/PKCS5Padding, - PBEWithMD5AndDES, - AES/CBC/PKCS5Padding, - Blowfish, - DESede - --> - <algorithm>PBEWithMD5AndDES</algorithm> - <!-- salt value to use when generating the secret key --> - <salt>thesalt</salt> - <!-- pass phrase to use when generating the secret key --> - <password>thepass</password> - <!-- iteration count to use when generating the secret key --> - <iteration-count>19</iteration-count> - </symmetric-encryption> - </network> - <partition-group enabled="false"/> - <executor-service> - <pool-size>16</pool-size> - <!-- Queue capacity. 0 means Integer.MAX_VALUE --> - <queue-capacity>0</queue-capacity> - </executor-service> - <queue name="default"> - <!-- - Maximum size of the queue. When a JVM's local queue size reaches the maximum, - all put/offer operations will get blocked until the queue size - of the JVM goes down below the maximum. - Any integer between 0 and Integer.MAX_VALUE. 0 means - Integer.MAX_VALUE. Default is 0. - --> - <max-size>0</max-size> - <!-- - Number of backups. If 1 is set as the backup-count for example, - then all entries of the map will be copied to another JVM for - fail-safety. 0 means no backup. - --> - <backup-count>1</backup-count> - <!-- - Number of async backups. 0 means no backup. - --> - <async-backup-count>0</async-backup-count> - <empty-queue-ttl>-1</empty-queue-ttl> - </queue> - - <map name="default"> - <!-- - Data type that will be used for storing recordMap. - Possible values: - BINARY (default): keys and values will be stored as binary data - OBJECT : values will be stored in their object forms - OFFHEAP : values will be stored in non-heap region of JVM - --> - <in-memory-format>BINARY</in-memory-format> - <!-- - Number of backups. If 1 is set as the backup-count for example, - then all entries of the map will be copied to another JVM for - fail-safety. 0 means no backup. - --> - <backup-count>1</backup-count> - <!-- - Number of async backups. 0 means no backup. - --> - <async-backup-count>0</async-backup-count> - <!-- - Maximum number of seconds for each entry to stay in the map. Entries that are - older than <time-to-live-seconds> and not updated for <time-to-live-seconds> - will get automatically evicted from the map. - Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0. - --> - <time-to-live-seconds>0</time-to-live-seconds> - <!-- - Maximum number of seconds for each entry to stay idle in the map. Entries that are - idle(not touched) for more than <max-idle-seconds> will get - automatically evicted from the map. Entry is touched if get, put or containsKey is called. - Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0. - --> - <max-idle-seconds>0</max-idle-seconds> - <!-- - Valid values are: - NONE (no eviction), - LRU (Least Recently Used), - LFU (Least Frequently Used). - NONE is the default. - --> - <eviction-policy>NONE</eviction-policy> - <!-- - Maximum size of the map. When max size is reached, - map is evicted based on the policy defined. - Any integer between 0 and Integer.MAX_VALUE. 0 means - Integer.MAX_VALUE. Default is 0. - --> - <max-size policy="PER_NODE">0</max-size> - <!-- - When max. size is reached, specified percentage of - the map will be evicted. Any integer between 0 and 100. - If 25 is set for example, 25% of the entries will - get evicted. - --> - <eviction-percentage>25</eviction-percentage> - <!-- - While recovering from split-brain (network partitioning), - map entries in the small cluster will merge into the bigger cluster - based on the policy set here. When an entry merge into the - cluster, there might an existing entry with the same key already. - Values of these entries might be different for that same key. - Which value should be set for the key? Conflict is resolved by - the policy set here. Default policy is PutIfAbsentMapMergePolicy - - There are built-in merge policies such as - com.hazelcast.map.merge.PassThroughMergePolicy; entry will be added if there is no existing entry for the key. - com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster. - com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins. - com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins. - --> - <merge-policy>com.hazelcast.map.merge.PassThroughMergePolicy</merge-policy> - </map> - - <multimap name="default"> - <backup-count>1</backup-count> - <value-collection-type>SET</value-collection-type> - </multimap> - - <multimap name="default"> - <backup-count>1</backup-count> - <value-collection-type>SET</value-collection-type> - </multimap> - - <list name="default"> - <backup-count>1</backup-count> - </list> - - <set name="default"> - <backup-count>1</backup-count> - </set> - - <jobtracker name="default"> - <max-thread-size>0</max-thread-size> - <!-- Queue size 0 means number of partitions * 2 --> - <queue-size>0</queue-size> - <retry-count>0</retry-count> - <chunk-size>1000</chunk-size> - <communicate-stats>true</communicate-stats> - <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy> - </jobtracker> - - <semaphore name="default"> - <initial-permits>0</initial-permits> - <backup-count>1</backup-count> - <async-backup-count>0</async-backup-count> - </semaphore> - - <serialization> - <portable-version>0</portable-version> - </serialization> - - <services enable-defaults="true" /> -</hazelcast> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index ddcbed5..55a24ea 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -17,14 +17,10 @@ cluster.name=contextElasticSearch index.name=context -elasticSearchConfig=file:${karaf.etc}/elasticsearch.yml monthlyIndex.numberOfShards=3 monthlyIndex.numberOfReplicas=0 numberOfShards=5 numberOfReplicas=0 -node.data=true -discovery.zen.ping.multicast.enabled=false -#discovery.zen.ping.unicast.hosts=["192.168.0.1:9300", "192.168.0.2:9300"] defaultQueryLimit=10 # The following settings control the behavior of the BulkProcessor API. You can find more information about these @@ -37,7 +33,5 @@ bulkProcessor.bulkSize=5MB bulkProcessor.flushInterval=5s bulkProcessor.backoffPolicy=exponential -cluster.group=default -cluster.jmxUsername=karaf -cluster.jmxPassword=karaf -cluster.jmxPort=1099 \ No newline at end of file +minimalElasticSearchVersion=5.0.0 +maximalElasticSearchVersion=5.2.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java ---------------------------------------------------------------------- diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index 75c0a3b..a6b175f 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -452,4 +452,12 @@ public interface PersistenceService { * @return {@code true} if the operation was successful, {@code false} otherwise */ boolean removeIndex(final String indexName); + + /** + * Removes all data associated with the provided scope. + * + * @param scope the scope for which we want to remove data + */ + void purge(final String scope); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1aba4e5..6de2441 100644 --- a/pom.xml +++ b/pom.xml @@ -427,8 +427,6 @@ <exclude>**/src/main/webapp/WEB-INF/beans.xml</exclude> <!-- Web application robots.txt file --> <exclude>**/src/main/webapp/robots.txt</exclude> - <!-- Elastic search configuration files with (mostly) default configuration --> - <exclude>**/src/main/resources/elasticsearch.yml</exclude> <!-- ignore generated log files --> <exclude>**/*.log</exclude> </excludes> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/services/pom.xml ---------------------------------------------------------------------- diff --git a/services/pom.xml b/services/pom.xml index 7a58cd7..5188aea 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -112,6 +112,16 @@ <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> </dependency> + <dependency> + <groupId>org.apache.karaf.cellar</groupId> + <artifactId>org.apache.karaf.cellar.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.karaf.cellar</groupId> + <artifactId>org.apache.karaf.cellar.config</artifactId> + <scope>provided</scope> + </dependency> </dependencies> <build> @@ -156,6 +166,18 @@ <type>cfg</type> <classifier>thirdpartycfg</classifier> </artifact> + <artifact> + <file> + src/main/resources/org.apache.unomi.cluster.cfg + </file> + <type>cfg</type> + <classifier>clustercfg</classifier> + </artifact> + <artifact> + <file>src/main/resources/hazelcast.xml</file> + <type>xml</type> + <classifier>hazelcastconfig</classifier> + </artifact> </artifacts> </configuration> </execution>
