http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java new file mode 100644 index 0000000..a78cbc2 --- /dev/null +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -0,0 +1,1328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.persistence.elasticsearch; + +import org.apache.unomi.api.ClusterNode; +import org.apache.unomi.api.Item; +import org.apache.unomi.api.PartialList; +import org.apache.unomi.api.TimestampedItem; +import org.apache.unomi.api.conditions.Condition; +import org.apache.unomi.api.query.DateRange; +import org.apache.unomi.api.query.IpRange; +import org.apache.unomi.api.query.NumericRange; +import org.apache.unomi.api.services.ClusterService; +import org.apache.unomi.persistence.elasticsearch.conditions.*; +import org.apache.unomi.persistence.spi.CustomObjectMapper; +import org.apache.unomi.persistence.spi.PersistenceService; +import org.apache.unomi.persistence.spi.aggregate.*; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.percolate.PercolateResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.nodes.NodesOperationRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.UnmodifiableIterator; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.common.unit.DistanceUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.*; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.node.Node; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.*; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.filter.Filter; +import org.elasticsearch.search.aggregations.bucket.global.Global; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder; +import org.elasticsearch.search.aggregations.bucket.missing.MissingBuilder; +import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder; +import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeBuilder; +import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder; +import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; +import org.elasticsearch.search.sort.GeoDistanceSortBuilder; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleEvent; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.SynchronousBundleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +@SuppressWarnings("rawtypes") +public class ElasticSearchPersistenceServiceImpl implements PersistenceService, ClusterService, SynchronousBundleListener { + + public static final long MILLIS_PER_DAY = 24L * 60L * 60L * 1000L; + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); + private Node node; + private Client client; + private String clusterName; + private String indexName; + private String monthlyIndexNumberOfShards; + private String monthlyIndexNumberOfReplicas; + private String numberOfShards; + private String numberOfReplicas; + private Boolean nodeData; + private Boolean discoveryEnabled; + private String elasticSearchConfig = null; + private BundleContext bundleContext; + private Map<String, String> mappings = new HashMap<String, String>(); + private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher; + private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher; + + private Map<String,String> indexNames; + private List<String> itemsMonthlyIndexed; + private Map<String, String> routingByType; + + private String address; + private String port; + private String secureAddress; + private String securePort; + + private Timer timer; + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public void setIndexName(String indexName) { + this.indexName = indexName; + } + + public void setMonthlyIndexNumberOfShards(String monthlyIndexNumberOfShards) { + this.monthlyIndexNumberOfShards = monthlyIndexNumberOfShards; + } + + public void setMonthlyIndexNumberOfReplicas(String monthlyIndexNumberOfReplicas) { + this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas; + } + + public void setDiscoveryEnabled(Boolean discoveryEnabled) { + this.discoveryEnabled = discoveryEnabled; + } + + public void setNumberOfShards(String numberOfShards) { + this.numberOfShards = numberOfShards; + } + + public void setNumberOfReplicas(String numberOfReplicas) { + this.numberOfReplicas = numberOfReplicas; + } + + public void setNodeData(Boolean nodeData) { + this.nodeData = nodeData; + } + + public void setAddress(String address) { + this.address = address; + } + + public void setPort(String port) { + this.port = port; + } + + public void setSecureAddress(String secureAddress) { + this.secureAddress = secureAddress; + } + + public void setSecurePort(String securePort) { + this.securePort = securePort; + } + + public void setItemsMonthlyIndexed(List<String> itemsMonthlyIndexed) { + this.itemsMonthlyIndexed = itemsMonthlyIndexed; + } + + public void setIndexNames(Map<String, String> indexNames) { + this.indexNames = indexNames; + } + + public void setRoutingByType(Map<String, String> routingByType) { + this.routingByType = routingByType; + } + + public void setElasticSearchConfig(String elasticSearchConfig) { + this.elasticSearchConfig = elasticSearchConfig; + } + + public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) { + this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher; + } + + public void setConditionESQueryBuilderDispatcher(ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher) { + this.conditionESQueryBuilderDispatcher = conditionESQueryBuilderDispatcher; + } + + public void start() { + + loadPredefinedMappings(bundleContext, false); + + // on startup + new InClassLoaderExecute<Object>() { + public Object execute(Object... args) { + logger.info("Starting ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "..."); + Map<String, String> settings = null; + if (elasticSearchConfig != null && elasticSearchConfig.length() > 0) { + try { + URL elasticSearchConfigURL = new URL(elasticSearchConfig); + Settings.Builder settingsBuilder = ImmutableSettings.builder().loadFromUrl(elasticSearchConfigURL); + settings = settingsBuilder.build().getAsMap(); + logger.info("Successfully loaded ElasticSearch configuration from " + elasticSearchConfigURL); + } catch (MalformedURLException e) { + logger.error("Error in ElasticSearch configuration URL ", e); + } catch (SettingsException se) { + logger.info("Error trying to load settings from " + elasticSearchConfig + ": " + se.getMessage() + " (activate debug mode for exception details)"); + if (logger.isDebugEnabled()) { + logger.debug("Exception details", se); + } + } + } + + address = System.getProperty("contextserver.address", address); + port = System.getProperty("contextserver.port", port); + secureAddress = System.getProperty("contextserver.secureAddress", secureAddress); + securePort = System.getProperty("contextserver.securePort", securePort); + + ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder(); + if (settings != null) { + settingsBuilder.put(settings); + } + + settingsBuilder.put("cluster.name", clusterName) + .put("node.data", nodeData) + .put("discovery.zen.ping.multicast.enabled", discoveryEnabled) + .put("index.number_of_replicas", numberOfReplicas) + .put("index.number_of_shards", numberOfShards) + .put("node.contextserver.address", address) + .put("node.contextserver.port", port) + .put("node.contextserver.secureAddress", secureAddress) + .put("node.contextserver.securePort", securePort); + + node = nodeBuilder().settings(settingsBuilder).node(); + client = node.client(); + // @todo is there a better way to detect index existence than to wait for it to startup ? + boolean indexExists = false; + int tries = 0; + while (!indexExists && tries < 20) { + IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet(); + indexExists = indicesExistsResponse.isExists(); + tries++; + try { + Thread.sleep(100); + } catch (InterruptedException e) { + logger.error("Interrupted", e); + } + } + if (!indexExists) { + logger.info("{} index doesn't exist yet, creating it...", indexName); + Map<String,String> indexMappings = new HashMap<String,String>(); + for (Map.Entry<String, String> entry : mappings.entrySet()) { + if (!itemsMonthlyIndexed.contains(entry.getKey()) && !indexNames.containsKey(entry.getKey())) { + indexMappings.put(entry.getKey(), entry.getValue()); + } + } + + internalCreateIndex(indexName, indexMappings); + } + + client.admin().indices().preparePutTemplate(indexName + "_monthlyindex") + .setTemplate(indexName + "-*") + .setOrder(1) + .setSettings(ImmutableSettings.settingsBuilder() + .put("number_of_shards", Integer.parseInt(monthlyIndexNumberOfShards)) + .put("number_of_replicas", Integer.parseInt(monthlyIndexNumberOfReplicas)) + .build()).execute().actionGet(); + + getMonthlyIndex(new Date(), true); + + return null; + } + }.executeInClassLoader(); + + + bundleContext.addBundleListener(this); + + try { + for (ServiceReference<ConditionEvaluator> reference : bundleContext.getServiceReferences(ConditionEvaluator.class, null)) { + ConditionEvaluator service = bundleContext.getService(reference); + conditionEvaluatorDispatcher.addEvaluator(reference.getProperty("conditionEvaluatorId").toString(), reference.getBundle().getBundleId(), service); + } + for (ServiceReference<ConditionESQueryBuilder> reference : bundleContext.getServiceReferences(ConditionESQueryBuilder.class, null)) { + ConditionESQueryBuilder service = bundleContext.getService(reference); + conditionESQueryBuilderDispatcher.addQueryBuilder(reference.getProperty("queryBuilderId").toString(), reference.getBundle().getBundleId(), service); + } + } catch (Exception e) { + logger.error("Cannot get services", e); + } + + timer = new Timer(); + + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + GregorianCalendar gc = new GregorianCalendar(); + int thisMonth = gc.get(Calendar.MONTH); + gc.add(Calendar.DAY_OF_MONTH, 1); + if (gc.get(Calendar.MONTH) != thisMonth) { + getMonthlyIndex(gc.getTime(), true); + } + } + }, 10000L, 24L * 60L * 60L * 1000L); + } + + public void stop() { + + new InClassLoaderExecute<Object>() { + protected Object execute(Object... args) { + logger.info("Closing ElasticSearch persistence backend..."); + node.close(); + return null; + } + }.executeInClassLoader(); + + if (timer != null) { + timer.cancel(); + timer = null; + } + + bundleContext.removeBundleListener(this); + } + + @Override + public void bundleChanged(BundleEvent event) { + switch (event.getType()) { + case BundleEvent.STARTED: + if (event.getBundle() != null && event.getBundle().getRegisteredServices() != null) { + for (ServiceReference<?> reference : event.getBundle().getRegisteredServices()) { + Object service = bundleContext.getService(reference); + if (service instanceof ConditionEvaluator) { + conditionEvaluatorDispatcher.addEvaluator(reference.getProperty("conditionEvaluatorId").toString(), event.getBundle().getBundleId(), (ConditionEvaluator) service); + } + if (service instanceof ConditionESQueryBuilder) { + conditionESQueryBuilderDispatcher.addQueryBuilder(reference.getProperty("queryBuilderId").toString(), event.getBundle().getBundleId(), (ConditionESQueryBuilder) service); + } + } + } + loadPredefinedMappings(event.getBundle().getBundleContext(), true); + break; + case BundleEvent.STOPPING: + conditionEvaluatorDispatcher.removeEvaluators(event.getBundle().getBundleId()); + conditionESQueryBuilderDispatcher.removeQueryBuilders(event.getBundle().getBundleId()); + break; + } + } + + private String getMonthlyIndex(Date date) { + return getMonthlyIndex(date, false); + } + + private String getMonthlyIndex(Date date, boolean checkAndCreate) { + String d = new SimpleDateFormat("-YYYY-MM").format(date); + String monthlyIndexName = indexName + d; + + if (checkAndCreate) { + IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(monthlyIndexName).execute().actionGet(); + boolean indexExists = indicesExistsResponse.isExists(); + if (!indexExists) { + logger.info("{} index doesn't exist yet, creating it...", monthlyIndexName); + + Map<String,String> indexMappings = new HashMap<String,String>(); + for (Map.Entry<String, String> entry : mappings.entrySet()) { + if (itemsMonthlyIndexed.contains(entry.getKey())) { + indexMappings.put(entry.getKey(), entry.getValue()); + } + } + + internalCreateIndex(monthlyIndexName, indexMappings); + logger.info("{} index created.", monthlyIndexName); + } + } + return monthlyIndexName; + } + + private void loadPredefinedMappings(BundleContext bundleContext, boolean createMapping) { + Enumeration<URL> predefinedMappings = bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true); + if (predefinedMappings == null) { + return; + } + while (predefinedMappings.hasMoreElements()) { + URL predefinedMappingURL = predefinedMappings.nextElement(); + logger.debug("Found mapping at " + predefinedMappingURL + ", loading... "); + try { + final String path = predefinedMappingURL.getPath(); + String name = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.')); + BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream())); + + StringBuilder content = new StringBuilder(); + String l; + while ((l = reader.readLine()) != null) { + content.append(l); + } + mappings.put(name, content.toString()); + if (createMapping) { + if (itemsMonthlyIndexed.contains(name)) { + createMapping(name, content.toString(), indexName + "-*"); + } else if (indexNames.containsKey(name)) { + createMapping(name, content.toString(), indexNames.get(name)); + } else { + createMapping(name, content.toString(), indexName); + } + } + } catch (Exception e) { + logger.error("Error while loading segment definition " + predefinedMappingURL, e); + } + } + } + + + @Override + public <T extends Item> List<T> getAllItems(final Class<T> clazz) { + return getAllItems(clazz, 0, -1, null).getList(); + } + + @Override + public long getAllItemsCount(String itemType) { + return queryCount(FilterBuilders.matchAllFilter(), itemType); + } + + @Override + public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy) { + return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null); + } + + @Override + public <T extends Item> T load(final String itemId, final Class<T> clazz) { + return load(itemId, null, clazz); + } + + @Override + public <T extends Item> T load(final String itemId, final Date dateHint, final Class<T> clazz) { + return new InClassLoaderExecute<T>() { + protected T execute(Object... args) { + try { + String itemType = (String) clazz.getField("ITEM_TYPE").get(null); + + if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) { + PartialList<T> r = query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null); + if (r.size() > 0) { + return r.get(0); + } + return null; + } else { + String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : + (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(dateHint) : indexName); + + GetResponse response = client.prepareGet(index, itemType, itemId) + .execute() + .actionGet(); + if (response.isExists()) { + String sourceAsString = response.getSourceAsString(); + final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); + value.setItemId(response.getId()); + return value; + } else { + return null; + } + } + } catch (IndexMissingException e) { + logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); + } catch (IllegalAccessException e) { + logger.error("Error loading itemType=" + clazz.getName() + "itemId=" + itemId, e); + } catch (Exception t) { + logger.error("Error loading itemType=" + clazz.getName() + "itemId=" + itemId, t); + } + return null; + } + }.executeInClassLoader(); + + } + + @Override + public boolean save(final Item item) { + + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + try { + String source = CustomObjectMapper.getObjectMapper().writeValueAsString(item); + String itemType = item.getItemType(); + String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : + (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(((TimestampedItem) item).getTimeStamp()) : indexName); + IndexRequestBuilder indexBuilder = client.prepareIndex(index, itemType, item.getItemId()) + .setSource(source); + if (routingByType.containsKey(itemType)) { + indexBuilder = indexBuilder.setRouting(routingByType.get(itemType)); + } + try { + indexBuilder.execute().actionGet(); + } catch (IndexMissingException e) { + if (itemsMonthlyIndexed.contains(itemType)) { + getMonthlyIndex(((TimestampedItem) item).getTimeStamp(), true); + indexBuilder.execute().actionGet(); + } + } + return true; + } catch (IOException e) { + logger.error("Error saving item " + item, e); + } + return false; + } + }.executeInClassLoader(); + + } + + @Override + public boolean update(final String itemId, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) { + return update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue)); + } + + @Override + public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + try { + String itemType = (String) clazz.getField("ITEM_TYPE").get(null); + + String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : + (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); + + client.prepareUpdate(index, itemType, itemId).setDoc(source) + .execute() + .actionGet(); + return true; + } catch (IndexMissingException e) { + logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); + } catch (NoSuchFieldException e) { + logger.error("Error updating item " + itemId, e); + } catch (IllegalAccessException e) { + logger.error("Error updating item " + itemId, e); + } + return false; + } + }.executeInClassLoader(); + } + + @Override + public boolean update(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + try { + String itemType = (String) clazz.getField("ITEM_TYPE").get(null); + + String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : + (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); + + client.prepareUpdate(index, itemType, itemId).setScript(script, ScriptService.ScriptType.INLINE).setScriptParams(scriptParams) + .execute() + .actionGet(); + return true; + } catch (IndexMissingException e) { + logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); + } catch (NoSuchFieldException e) { + logger.error("Error updating item " + itemId, e); + } catch (IllegalAccessException e) { + logger.error("Error updating item " + itemId, e); + } + return false; + } + }.executeInClassLoader(); + } + + @Override + public <T extends Item> boolean remove(final String itemId, final Class<T> clazz) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + //Index the query = register it in the percolator + try { + String itemType = (String) clazz.getField("ITEM_TYPE").get(null); + + client.prepareDelete(getIndexNameForQuery(itemType), itemType, itemId) + .execute().actionGet(); + return true; + } catch (Exception e) { + logger.error("Cannot remove", e); + } + return false; + } + }.executeInClassLoader(); + } + + public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + try { + String itemType = (String) clazz.getField("ITEM_TYPE").get(null); + + client.prepareDeleteByQuery(getIndexNameForQuery(itemType)) + .setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query)) + .execute().actionGet(); + return true; + } catch (Exception e) { + logger.error("Cannot remove by query", e); + } + return false; + } + }.executeInClassLoader(); + } + + public boolean createIndex(final String indexName) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet(); + boolean indexExists = indicesExistsResponse.isExists(); + if (!indexExists) { + Map<String,String> indexMappings = new HashMap<String,String>(); + for (Map.Entry<String, String> entry : mappings.entrySet()) { + if (indexNames.containsKey(entry.getKey()) && indexNames.get(entry.getKey()).equals(indexName)) { + indexMappings.put(entry.getKey(), entry.getValue()); + } + } + internalCreateIndex(indexName, indexMappings); + } + return !indexExists; + } + }.executeInClassLoader(); + } + + public boolean removeIndex(final String indexName) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet(); + boolean indexExists = indicesExistsResponse.isExists(); + if (indexExists) { + client.admin().indices().prepareDelete(indexName).execute().actionGet(); + } + return indexExists; + } + }.executeInClassLoader(); + } + + private void internalCreateIndex(String indexName, Map<String,String> mappings) { + CreateIndexRequestBuilder builder = client.admin().indices().prepareCreate(indexName) + .setSettings("{\n" + + " \"analysis\": {\n" + + " \"tokenizer\": {\n" + + " \"myTokenizer\": {\n" + + " \"type\":\"pattern\",\n" + + " \"pattern\":\".*\",\n" + + " \"group\":0\n" + + " }\n" + + " },\n" + + " \"analyzer\": {\n" + + " \"folding\": {\n" + + " \"type\":\"custom\",\n" + + " \"tokenizer\": \"myTokenizer\",\n" + + " \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n"); + + for (Map.Entry<String, String> entry : mappings.entrySet()) { + builder.addMapping(entry.getKey(), entry.getValue()); + } + + builder.execute().actionGet(); + } + + + private boolean createMapping(final String type, final String source, final String indexName) { + client.admin().indices() + .preparePutMapping(indexName) + .setType(type) + .setSource(source) + .execute().actionGet(); + return true; + } + + @Override + public Map<String, Map<String, Object>> getMapping(final String itemType) { + return new InClassLoaderExecute<Map<String, Map<String, Object>>>() { + @SuppressWarnings("unchecked") + protected Map<String, Map<String, Object>> execute(Object... args) { + GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings().setTypes(itemType).execute().actionGet(); + ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings(); + Map<String, Map<String, Object>> propertyMap = new HashMap<>(); + try { + UnmodifiableIterator<ImmutableOpenMap<String, MappingMetaData>> it = mappings.valuesIt(); + while (it.hasNext()) { + ImmutableOpenMap<String, MappingMetaData> next = it.next(); + Map<String, Map<String, Object>> properties = (Map<String, Map<String, Object>>) next.get(itemType).getSourceAsMap().get("properties"); + for (Map.Entry<String, Map<String, Object>> entry : properties.entrySet()) { + if (propertyMap.containsKey(entry.getKey())) { + Map<String, Object> subPropMap = propertyMap.get(entry.getKey()); + for (Map.Entry<String, Object> subentry : entry.getValue().entrySet()) { + if (subPropMap.containsKey(subentry.getKey()) && subPropMap.get(subentry.getKey()) instanceof Map && subentry.getValue() instanceof Map) { + ((Map) subPropMap.get(subentry.getKey())).putAll((Map) subentry.getValue()); + } else { + subPropMap.put(subentry.getKey(), subentry.getValue()); + } + } + } else { + propertyMap.put(entry.getKey(), entry.getValue()); + } + } + } + } catch (IOException e) { + logger.error("Cannot get mapping", e); + } + return propertyMap; + } + }.executeInClassLoader(); + } + + public boolean saveQuery(final String queryName, final String query) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + //Index the query = register it in the percolator + try { + logger.info("Saving query : " + queryName); + client.prepareIndex(indexName, ".percolator", queryName) + .setSource(query) + .setRefresh(true) // Needed when the query shall be available immediately + .execute().actionGet(); + return true; + } catch (Exception e) { + logger.error("Cannot save query", e); + } + return false; + } + }.executeInClassLoader(); + } + + @Override + public boolean saveQuery(String queryName, Condition query) { + if (query == null) { + return false; + } + saveQuery(queryName, conditionESQueryBuilderDispatcher.getQuery(query)); + return true; + } + + @Override + public boolean removeQuery(final String queryName) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + //Index the query = register it in the percolator + try { + client.prepareDelete(indexName, ".percolator", queryName) + .setRefresh(true) // Needed when the query shall be available immediately + .execute().actionGet(); + return true; + } catch (Exception e) { + logger.error("Cannot delete query", e); + } + return false; + } + }.executeInClassLoader(); + } + + @Override + public List<String> getMatchingSavedQueries(final Item item) { + return new InClassLoaderExecute<List<String>>() { + protected List<String> execute(Object... args) { + List<String> matchingQueries = new ArrayList<String>(); + try { + String source = CustomObjectMapper.getObjectMapper().writeValueAsString(item); + + String itemType = item.getItemType(); + + //Percolate + PercolateResponse response = client.preparePercolate() + .setIndices(indexName) + .setDocumentType(itemType) + .setSource("{doc:" + source + "}").execute().actionGet(); + //Iterate over the results + for (PercolateResponse.Match match : response) { + //Handle the result which is the name of + //the query in the percolator + matchingQueries.add(match.getId().string()); + } + } catch (IOException e) { + logger.error("Error getting matching saved queries for item=" + item, e); + } + return matchingQueries; + } + }.executeInClassLoader(); + + } + + @Override + public boolean testMatch(Condition query, Item item) { + try { + return conditionEvaluatorDispatcher.eval(query, item); + } catch (UnsupportedOperationException e) { + logger.error("Eval not supported, continue with query", e); + } + try { + final Class<? extends Item> clazz = item.getClass(); + String itemType = (String) clazz.getField("ITEM_TYPE").get(null); + + FilterBuilder builder = FilterBuilders.andFilter( + FilterBuilders.idsFilter(itemType).ids(item.getItemId()), + conditionESQueryBuilderDispatcher.buildFilter(query)); + return queryCount(builder, itemType) > 0; + } catch (IllegalAccessException e) { + logger.error("Error getting query for item=" + item, e); + } catch (NoSuchFieldException e) { + logger.error("Error getting query for item=" + item, e); + } + return false; + } + + @Override + public <T extends Item> List<T> query(final Condition query, String sortBy, final Class<T> clazz) { + return query(query, sortBy, clazz, 0, -1).getList(); + } + + @Override + public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) { + return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null); + } + + @Override + public <T extends Item> PartialList<T> queryFullText(final String fulltext, final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) { + return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null); + } + + @Override + public <T extends Item> List<T> query(final String fieldName, final String fieldValue, String sortBy, final Class<T> clazz) { + return query(fieldName, fieldValue, sortBy, clazz, 0, -1).getList(); + } + + @Override + public <T extends Item> List<T> query(final String fieldName, final String[] fieldValues, String sortBy, final Class<T> clazz) { + return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz)).getList(); + } + + @Override + public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) { + return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz)); + } + + @Override + public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) { + return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz)); + } + + @Override + public <T extends Item> PartialList<T> queryFullText(String fulltext, String sortBy, Class<T> clazz, int offset, int size) { + return query(QueryBuilders.queryStringQuery(fulltext).defaultField("_all"), sortBy, clazz, offset, size, getRouting("_all", new String[]{fulltext}, clazz)); + } + + @Override + public <T extends Item> PartialList<T> rangeQuery(String fieldName, String from, String to, String sortBy, Class<T> clazz, int offset, int size) { + RangeQueryBuilder builder = QueryBuilders.rangeQuery(fieldName); + builder.from(from); + builder.to(to); + return query(builder, sortBy, clazz, offset, size, null); + } + + @Override + public long queryCount(Condition query, String itemType) { + return queryCount(conditionESQueryBuilderDispatcher.buildFilter(query), itemType); + } + + private long queryCount(final FilterBuilder filter, final String itemType) { + return new InClassLoaderExecute<Long>() { + + @Override + protected Long execute(Object... args) { + SearchResponse response = client.prepareSearch(getIndexNameForQuery(itemType)) + .setTypes(itemType) + .setSearchType(SearchType.COUNT) + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(AggregationBuilders.filter("filter").filter(filter)) + .execute() + .actionGet(); + Aggregations searchHits = response.getAggregations(); + Filter filter = searchHits.get("filter"); + return filter.getDocCount(); + } + }.executeInClassLoader(); + } + + private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing) { + return new InClassLoaderExecute<PartialList<T>>() { + + @Override + protected PartialList<T> execute(Object... args) { + List<T> results = new ArrayList<T>(); + long totalHits = 0; + try { + String itemType = getItemType(clazz); + + SearchRequestBuilder requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType)) + .setTypes(itemType) + .setFetchSource(true) + .setQuery(query) + .setFrom(offset); + if (size != -1) { + requestBuilder.setSize(size); + } else { + requestBuilder.setSize(Integer.MAX_VALUE); + } + if (routing != null) { + requestBuilder.setRouting(routing); + } + if (sortBy != null) { + String[] sortByArray = sortBy.split(","); + for (String sortByElement : sortByArray) { + if (sortByElement.startsWith("geo:")) { + String[] elements = sortByElement.split(":"); + GeoDistanceSortBuilder distanceSortBuilder = SortBuilders.geoDistanceSort(elements[1]).point(Double.parseDouble(elements[2]), Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS); + if (elements.length > 4 && elements[4].equals("desc")) { + requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.DESC)); + } else { + requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.ASC)); + } + } else { + if (sortByElement.endsWith(":desc")) { + requestBuilder = requestBuilder.addSort(sortByElement.substring(0, sortByElement.length() - ":desc".length()), SortOrder.DESC); + } else if (sortByElement.endsWith(":asc")) { + requestBuilder = requestBuilder.addSort(sortByElement.substring(0, sortByElement.length() - ":asc".length()), SortOrder.ASC); + } else { + requestBuilder = requestBuilder.addSort(sortByElement, SortOrder.ASC); + } + } + } + } + SearchResponse response = requestBuilder + .execute() + .actionGet(); + SearchHits searchHits = response.getHits(); + totalHits = searchHits.getTotalHits(); + for (SearchHit searchHit : searchHits) { + String sourceAsString = searchHit.getSourceAsString(); + final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz); + value.setItemId(searchHit.getId()); + results.add(value); + } + } catch (Exception t) { + logger.error("Error loading itemType=" + clazz.getName() + "query=" + query, t); + } + + return new PartialList<T>(results, offset, size, totalHits); + } + }.executeInClassLoader(); + } + + @Override + public Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType) { + return new InClassLoaderExecute<Map<String, Long>>() { + + @Override + protected Map<String, Long> execute(Object... args) { + Map<String, Long> results = new LinkedHashMap<String, Long>(); + + SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType)) + .setTypes(itemType) + .setSearchType(SearchType.COUNT) + .setQuery(QueryBuilders.matchAllQuery()); + + List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>(); + + if (aggregate != null) { + AggregationBuilder bucketsAggregation = null; + if (aggregate instanceof DateAggregate) { + DateAggregate dateAggregate = (DateAggregate) aggregate; + DateHistogramBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(aggregate.getField()).interval(new DateHistogram.Interval((dateAggregate.getInterval()))); + if (dateAggregate.getFormat() != null) { + dateHistogramBuilder.format(dateAggregate.getFormat()); + } + bucketsAggregation = dateHistogramBuilder; + } else if (aggregate instanceof NumericRangeAggregate) { + RangeBuilder rangebuilder = AggregationBuilders.range("buckets").field(aggregate.getField()); + for (NumericRange range : ((NumericRangeAggregate) aggregate).getRanges()) { + if (range != null) { + if (range.getFrom() != null && range.getTo() != null) { + rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo()); + } else if (range.getFrom() != null) { + rangebuilder.addUnboundedFrom(range.getKey(), range.getFrom()); + } else if (range.getTo() != null) { + rangebuilder.addUnboundedTo(range.getKey(), range.getTo()); + } + } + } + bucketsAggregation = rangebuilder; + } else if (aggregate instanceof DateRangeAggregate) { + DateRangeAggregate dateRangeAggregate = (DateRangeAggregate) aggregate; + DateRangeBuilder rangebuilder = AggregationBuilders.dateRange("buckets").field(aggregate.getField()); + if (dateRangeAggregate.getFormat() != null) { + rangebuilder.format(dateRangeAggregate.getFormat()); + } + for (DateRange range : dateRangeAggregate.getDateRanges()) { + if (range != null) { + rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo()); + } + } + bucketsAggregation = rangebuilder; + } else if (aggregate instanceof IpRangeAggregate) { + IpRangeAggregate ipRangeAggregate = (IpRangeAggregate) aggregate; + IPv4RangeBuilder rangebuilder = AggregationBuilders.ipRange("buckets").field(aggregate.getField()); + for (IpRange range : ipRangeAggregate.getRanges()) { + if (range != null) { + rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo()); + } + } + bucketsAggregation = rangebuilder; + } else { + //default + bucketsAggregation = AggregationBuilders.terms("buckets").field(aggregate.getField()).size(Integer.MAX_VALUE); + } + if (bucketsAggregation != null) { + final MissingBuilder missingBucketsAggregation = AggregationBuilders.missing("missing").field(aggregate.getField()); + for (AggregationBuilder aggregationBuilder : lastAggregation) { + bucketsAggregation.subAggregation(aggregationBuilder); + missingBucketsAggregation.subAggregation(aggregationBuilder); + } + lastAggregation = Arrays.asList(bucketsAggregation, missingBucketsAggregation); + } + } + + if (filter != null) { + AggregationBuilder filterAggregation = AggregationBuilders.filter("filter").filter(conditionESQueryBuilderDispatcher.buildFilter(filter)); + for (AggregationBuilder aggregationBuilder : lastAggregation) { + filterAggregation.subAggregation(aggregationBuilder); + } + lastAggregation = Collections.singletonList(filterAggregation); + } + + + AggregationBuilder globalAggregation = AggregationBuilders.global("global"); + for (AggregationBuilder aggregationBuilder : lastAggregation) { + globalAggregation.subAggregation(aggregationBuilder); + } + + builder.addAggregation(globalAggregation); + + SearchResponse response = builder.execute().actionGet(); + + Aggregations aggregations = response.getAggregations(); + if (aggregations != null) { + Global globalAgg = aggregations.get("global"); + results.put("_all", globalAgg.getDocCount()); + aggregations = globalAgg.getAggregations(); + + if (aggregations.get("filter") != null) { + Filter filterAgg = aggregations.get("filter"); + results.put("_filtered", filterAgg.getDocCount()); + aggregations = filterAgg.getAggregations(); + } + if (aggregations.get("buckets") != null) { + MultiBucketsAggregation terms = aggregations.get("buckets"); + for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) { + results.put(bucket.getKey(), bucket.getDocCount()); + } + SingleBucketAggregation missing = aggregations.get("missing"); + if (missing.getDocCount() > 0) { + results.put("_missing", missing.getDocCount()); + } + } + } + + return results; + } + }.executeInClassLoader(); + } + + private <T extends Item> String getItemType(Class<T> clazz) { + try { + return (String) clazz.getField("ITEM_TYPE").get(null); + } catch (NoSuchFieldException e) { + logger.error("Class " + clazz.getName() + " doesn't define a publicly accessible ITEM_TYPE field", e); + } catch (IllegalAccessException e) { + logger.error("Error loading itemType=" + clazz.getName(), e); + } + return null; + } + + private <T extends Item> String[] getRouting(String fieldName, String[] fieldValues, Class<T> clazz) { + String itemType = getItemType(clazz); + String[] routing = null; + if (routingByType.containsKey(itemType) && routingByType.get(itemType).equals(fieldName)) { + routing = fieldValues; + } + return routing; + } + + + @Override + public List<ClusterNode> getClusterNodes() { + return new InClassLoaderExecute<List<ClusterNode>>() { + + @Override + protected List<ClusterNode> execute(Object... args) { + Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, ClusterNode>(); + + NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo(NodesOperationRequest.ALL_NODES) + .setSettings(true) + .execute() + .actionGet(); + NodeInfo[] nodesInfoArray = nodesInfoResponse.getNodes(); + for (NodeInfo nodeInfo : nodesInfoArray) { + if (nodeInfo.getSettings().get("node.contextserver.address") != null) { + ClusterNode clusterNode = new ClusterNode(); + clusterNode.setHostName(nodeInfo.getHostname()); + clusterNode.setHostAddress(nodeInfo.getSettings().get("node.contextserver.address")); + clusterNode.setPublicPort(Integer.parseInt(nodeInfo.getSettings().get("node.contextserver.port"))); + clusterNode.setSecureHostAddress(nodeInfo.getSettings().get("node.contextserver.secureAddress")); + clusterNode.setSecurePort(Integer.parseInt(nodeInfo.getSettings().get("node.contextserver.securePort"))); + clusterNode.setMaster(nodeInfo.getNode().isMasterNode()); + clusterNode.setData(nodeInfo.getNode().isDataNode()); + clusterNodes.put(nodeInfo.getNode().getId(), clusterNode); + } + } + + NodesStatsResponse nodesStatsResponse = client.admin().cluster().prepareNodesStats(NodesOperationRequest.ALL_NODES) + .setOs(true) + .setProcess(true) + .execute() + .actionGet(); + NodeStats[] nodeStatsArray = nodesStatsResponse.getNodes(); + for (NodeStats nodeStats : nodeStatsArray) { + ClusterNode clusterNode = clusterNodes.get(nodeStats.getNode().getId()); + if (clusterNode != null) { + // the following may be null in the case where Sigar didn't initialize properly, for example + // because the native libraries were not installed or if we redeployed the OSGi bundle in which + // case Sigar cannot initialize properly since it tries to reload the native libraries, generates + // an error and doesn't initialize properly. + if (nodeStats.getProcess() != null && nodeStats.getProcess().getCpu() != null) { + clusterNode.setCpuLoad(nodeStats.getProcess().getCpu().getPercent()); + } + if (nodeStats.getOs() != null) { + clusterNode.setLoadAverage(nodeStats.getOs().getLoadAverage()); + clusterNode.setUptime(nodeStats.getOs().getUptime().getMillis()); + } + } + } + + return new ArrayList<ClusterNode>(clusterNodes.values()); + } + }.executeInClassLoader(); + } + + @Override + public void refresh() { + new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) { + client.admin().indices().refresh(Requests.refreshRequest()).actionGet(); + return true; + } + }.executeInClassLoader(); + + } + + + @Override + public void purge(final Date date) { + new InClassLoaderExecute<Object>() { + @Override + protected Object execute(Object... args) { + IndicesStatsResponse statsResponse = client.admin().indices().prepareStats(indexName + "-*") + .setIndexing(false) + .setGet(false) + .setSearch(false) + .setWarmer(false) + .setMerge(false) + .setFieldData(false) + .setFlush(false) + .setCompletion(false) + .setRefresh(false) + .setSuggest(false) + .execute() + .actionGet(); + + SimpleDateFormat d = new SimpleDateFormat("yyyy-MM"); + + List<String> toDelete = new ArrayList<String>(); + for (String currentIndexName : statsResponse.getIndices().keySet()) { + if (currentIndexName.startsWith(indexName + "-")) { + try { + Date indexDate = d.parse(currentIndexName.substring(indexName.length() + 1)); + + if (indexDate.before(date)) { + toDelete.add(currentIndexName); + } + } catch (ParseException e) { + logger.error("Cannot parse index name " + currentIndexName, e); + } + } + } + if (!toDelete.isEmpty()) { + client.admin().indices().prepareDelete(toDelete.toArray(new String[toDelete.size()])).execute().actionGet(); + } + return null; + } + }.executeInClassLoader(); + } + + @Override + public void purge(final String scope) { + new InClassLoaderExecute<Void>() { + @Override + protected Void execute(Object... args) { + QueryBuilder query = QueryBuilders.termQuery("scope", ConditionContextHelper.foldToASCII(scope)); + + BulkRequestBuilder deleteByScope = client.prepareBulk(); + + final TimeValue keepAlive = TimeValue.timeValueHours(1); + SearchResponse response = client.prepareSearch(indexName + "*") + .setSearchType(SearchType.SCAN) + .setScroll(keepAlive) + .setQuery(query) + .setSize(100).execute().actionGet(); + + // Scroll until no more hits are returned + while (true) { + + for (SearchHit hit : response.getHits().getHits()) { + // add hit to bulk delete + deleteByScope.add(Requests.deleteRequest(hit.index()).type(hit.type()).id(hit.id())); + } + + response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet(); + + // If we have no more hits, exit + if (response.getHits().getHits().length == 0) { + break; + } + } + + // we're done with the scrolling, delete now + if (deleteByScope.numberOfActions() > 0) { + final BulkResponse deleteResponse = deleteByScope.get(); + if (deleteResponse.hasFailures()) { + // do something + logger.debug("Couldn't delete from scope " + scope + ":\n{}", deleteResponse.buildFailureMessage()); + } + } + + return null; + } + }.executeInClassLoader(); + } + + @Override + public Map<String, Double> getSingleValuesMetrics(final Condition condition, final String[] metrics, final String field, final String itemType) { + return new InClassLoaderExecute<Map<String, Double>>() { + + @Override + protected Map<String, Double> execute(Object... args) { + Map<String, Double> results = new LinkedHashMap<String, Double>(); + + SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType)) + .setTypes(itemType) + .setSearchType(SearchType.COUNT) + .setQuery(QueryBuilders.matchAllQuery()); + AggregationBuilder filterAggregation = AggregationBuilders.filter("metrics").filter(conditionESQueryBuilderDispatcher.buildFilter(condition)); + + if (metrics != null) { + for (String metric : metrics) { + switch (metric) { + case "sum": + filterAggregation.subAggregation(AggregationBuilders.sum("sum").field(field)); + break; + case "avg": + filterAggregation.subAggregation(AggregationBuilders.avg("avg").field(field)); + break; + case "min": + filterAggregation.subAggregation(AggregationBuilders.min("min").field(field)); + break; + case "max": + filterAggregation.subAggregation(AggregationBuilders.max("max").field(field)); + break; + } + } + } + builder.addAggregation(filterAggregation); + SearchResponse response = builder.execute().actionGet(); + + Aggregations aggregations = response.getAggregations(); + if (aggregations != null) { + Aggregation metricsResults = aggregations.get("metrics"); + if (metricsResults instanceof HasAggregations) { + aggregations = ((HasAggregations) metricsResults).getAggregations(); + for (Aggregation aggregation : aggregations) { + InternalNumericMetricsAggregation.SingleValue singleValue = (InternalNumericMetricsAggregation.SingleValue) aggregation; + results.put("_" + singleValue.getName(), singleValue.value()); + } + } + } + return results; + } + }.executeInClassLoader(); + } + + private String getIndexNameForQuery(String itemType) { + return indexNames.containsKey(itemType) ? indexNames.get(itemType) : + (itemsMonthlyIndexed.contains(itemType) ? indexName + "-*" : indexName); + } + + public abstract static class InClassLoaderExecute<T> { + + protected abstract T execute(Object... args); + + public T executeInClassLoader(Object... args) { + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + return execute(args); + } finally { + Thread.currentThread().setContextClassLoader(tccl); + } + } + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionContextHelper.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionContextHelper.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionContextHelper.java new file mode 100644 index 0000000..3666d8d --- /dev/null +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionContextHelper.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.persistence.elasticsearch.conditions; + +import org.apache.commons.lang3.StringUtils; +import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter; +import org.apache.lucene.util.ArrayUtil; +import org.apache.unomi.api.conditions.Condition; +import org.elasticsearch.common.base.Function; +import org.elasticsearch.common.collect.Lists; +import org.mvel2.MVEL; +import org.mvel2.ParserConfiguration; +import org.mvel2.ParserContext; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ConditionContextHelper { + private static Map<String,Serializable> mvelExpressions = new ConcurrentHashMap<>(); + + public static Condition getContextualCondition(Condition condition, Map<String, Object> context) { + if (context.isEmpty() || !hasContextualParameter(condition.getParameterValues())) { + return condition; + } + @SuppressWarnings("unchecked") + Map<String, Object> values = (Map<String, Object>) parseParameter(context, condition.getParameterValues()); + if (values == null) { + return null; + } + Condition n = new Condition(condition.getConditionType()); + n.setParameterValues(values); + return n; + } + + @SuppressWarnings("unchecked") + private static Object parseParameter(Map<String, Object> context, Object value) { + if (value instanceof String) { + if (((String) value).startsWith("parameter::") || ((String) value).startsWith("script::")) { + String s = (String) value; + if (s.startsWith("parameter::")) { + return context.get(StringUtils.substringAfter(s, "parameter::")); + } else if (s.startsWith("script::")) { + String script = StringUtils.substringAfter(s, "script::"); + if (!mvelExpressions.containsKey(script)) { + ParserConfiguration parserConfiguration = new ParserConfiguration(); + parserConfiguration.setClassLoader(ConditionContextHelper.class.getClassLoader()); + mvelExpressions.put(script,MVEL.compileExpression(script, new ParserContext(parserConfiguration))); + } + return MVEL.executeExpression(mvelExpressions.get(script), context); + } + } + } else if (value instanceof Map) { + Map<String, Object> values = new HashMap<String, Object>(); + for (Map.Entry<String, Object> entry : ((Map<String, Object>) value).entrySet()) { + Object parameter = parseParameter(context, entry.getValue()); + if (parameter == null) { + return null; + } + values.put(entry.getKey(), parameter); + } + return values; + } else if (value instanceof List) { + List<Object> values = new ArrayList<Object>(); + for (Object o : ((List<?>) value)) { + Object parameter = parseParameter(context, o); + if (parameter != null) { + values.add(parameter); + } + } + return values; + } + return value; + } + + private static boolean hasContextualParameter(Object value) { + if (value instanceof String) { + if (((String) value).startsWith("parameter::") || ((String) value).startsWith("script::")) { + return true; + } + } else if (value instanceof Map) { + for (Object o : ((Map<?, ?>) value).values()) { + if (hasContextualParameter(o)) { + return true; + } + } + } else if (value instanceof List) { + for (Object o : ((List<?>) value)) { + if (hasContextualParameter(o)) { + return true; + } + } + } + return false; + } + + public static String[] foldToASCII(String[] s) { + if (s != null) { + for (int i = 0; i < s.length; i++) { + s[i] = foldToASCII(s[i]); + } + } + return s; + } + + public static String foldToASCII(String s) { + if (s != null) { + s = s.toLowerCase(); + int maxSizeNeeded = 4 * s.length(); + char[] output = new char[ArrayUtil.oversize(maxSizeNeeded, 2)]; + int length = ASCIIFoldingFilter.foldToASCII(s.toCharArray(), 0, output, 0, s.length()); + return new String(output, 0, length); + } + return null; + } + + public static <T> List<T> foldToASCII(List<T> s) { + if (s != null) { + return Lists.transform(s, new Function<T, T>() { + @Override + public T apply(T o) { + if (o instanceof String) { + return (T) ConditionContextHelper.foldToASCII((String) o); + } + return o; + } + }); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java new file mode 100644 index 0000000..cc2dc89 --- /dev/null +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.persistence.elasticsearch.conditions; + +import org.apache.unomi.api.conditions.Condition; +import org.elasticsearch.index.query.FilterBuilder; + +import java.util.Map; + +public interface ConditionESQueryBuilder { + + FilterBuilder buildFilter(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher); + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java new file mode 100644 index 0000000..526b49f --- /dev/null +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.persistence.elasticsearch.conditions; + +import org.apache.unomi.api.conditions.Condition; +import org.elasticsearch.index.query.FilterBuilder; +import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.index.query.FilteredQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.osgi.framework.BundleContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ConditionESQueryBuilderDispatcher { + private static final Logger logger = LoggerFactory.getLogger(ConditionESQueryBuilderDispatcher.class.getName()); + + private BundleContext bundleContext; + private Map<String, ConditionESQueryBuilder> queryBuilders = new ConcurrentHashMap<>(); + private Map<Long, List<String>> queryBuildersByBundle = new ConcurrentHashMap<>(); + + public ConditionESQueryBuilderDispatcher() { + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + public void addQueryBuilder(String name, long bundleId, ConditionESQueryBuilder evaluator) { + queryBuilders.put(name, evaluator); + if (!queryBuildersByBundle.containsKey(bundleId)) { + queryBuildersByBundle.put(bundleId, new ArrayList<String>()); + } + queryBuildersByBundle.get(bundleId).add(name); + } + + public void removeQueryBuilders(long bundleId) { + if (queryBuildersByBundle.containsKey(bundleId)) { + for (String s : queryBuildersByBundle.get(bundleId)) { + queryBuilders.remove(s); + } + queryBuildersByBundle.remove(bundleId); + } + } + + public String getQuery(Condition condition) { + return "{\"query\": " + getQueryBuilder(condition).toString() + "}"; + } + + public FilteredQueryBuilder getQueryBuilder(Condition condition) { + return QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), buildFilter(condition)); + } + + public FilterBuilder buildFilter(Condition condition) { + return buildFilter(condition, new HashMap<String, Object>()); + } + + public FilterBuilder buildFilter(Condition condition, Map<String, Object> context) { + if(condition == null || condition.getConditionType() == null) { + throw new IllegalArgumentException("Condition is null or doesn't have type, impossible to build filter"); + } + + String queryBuilderKey = condition.getConditionType().getQueryBuilder(); + if (queryBuilderKey == null && condition.getConditionType().getParentCondition() != null) { + context.putAll(condition.getParameterValues()); + return buildFilter(condition.getConditionType().getParentCondition(), context); + } + + if (queryBuilderKey == null) { + throw new UnsupportedOperationException("No query builder defined for : " + condition.getConditionTypeId()); + } + + if (queryBuilders.containsKey(queryBuilderKey)) { + ConditionESQueryBuilder queryBuilder = queryBuilders.get(queryBuilderKey); + Condition contextualCondition = ConditionContextHelper.getContextualCondition(condition, context); + if (contextualCondition != null) { + return queryBuilder.buildFilter(contextualCondition, context, this); + } + } + + // if no matching + return FilterBuilders.matchAllFilter(); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluator.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluator.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluator.java new file mode 100644 index 0000000..86f8f52 --- /dev/null +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.persistence.elasticsearch.conditions; + +import org.apache.unomi.api.Item; +import org.apache.unomi.api.conditions.Condition; + +import java.util.Map; + +/** + * Condition evaluator interface + */ +public interface ConditionEvaluator { + + boolean eval(Condition condition, Item item, Map<String, Object> context, ConditionEvaluatorDispatcher dispatcher); + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java new file mode 100644 index 0000000..0341c46 --- /dev/null +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.persistence.elasticsearch.conditions; + +import org.apache.unomi.api.Item; +import org.apache.unomi.api.conditions.Condition; +import org.osgi.framework.BundleContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Entry point for condition evaluation. Will dispatch to all evaluators. + */ +public class ConditionEvaluatorDispatcher { + private static final Logger logger = LoggerFactory.getLogger(ConditionEvaluatorDispatcher.class.getName()); + + private BundleContext bundleContext; + private Map<String, ConditionEvaluator> evaluators = new ConcurrentHashMap<>(); + private Map<Long, List<String>> evaluatorsByBundle = new ConcurrentHashMap<>(); + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + public void addEvaluator(String name, long bundleId, ConditionEvaluator evaluator) { + evaluators.put(name, evaluator); + if (!evaluatorsByBundle.containsKey(bundleId)) { + evaluatorsByBundle.put(bundleId, new ArrayList<String>()); + } + evaluatorsByBundle.get(bundleId).add(name); + } + + public void removeEvaluators(long bundleId) { + if (evaluatorsByBundle.containsKey(bundleId)) { + for (String s : evaluatorsByBundle.get(bundleId)) { + evaluators.remove(s); + } + evaluatorsByBundle.remove(bundleId); + } + } + + public boolean eval(Condition condition, Item item) { + return eval(condition, item, new HashMap<String, Object>()); + } + + public boolean eval(Condition condition, Item item, Map<String, Object> context) { + String conditionEvaluatorKey = condition.getConditionType().getConditionEvaluator(); + if (condition.getConditionType().getParentCondition() != null) { + context.putAll(condition.getParameterValues()); + return eval(condition.getConditionType().getParentCondition(), item, context); + } + + if (conditionEvaluatorKey == null) { + throw new UnsupportedOperationException("No evaluator defined for : " + condition.getConditionTypeId()); + } + + if (evaluators.containsKey(conditionEvaluatorKey)) { + ConditionEvaluator evaluator = evaluators.get(conditionEvaluatorKey); + Condition contextualCondition = ConditionContextHelper.getContextualCondition(condition, context); + if (contextualCondition != null) { + return evaluator.eval(contextualCondition, item, context, this); + } else { + return true; + } + } + + // if no matching + return false; + } +}
