http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
new file mode 100644
index 0000000..a78cbc2
--- /dev/null
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -0,0 +1,1328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.persistence.elasticsearch;
+
+import org.apache.unomi.api.ClusterNode;
+import org.apache.unomi.api.Item;
+import org.apache.unomi.api.PartialList;
+import org.apache.unomi.api.TimestampedItem;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.query.DateRange;
+import org.apache.unomi.api.query.IpRange;
+import org.apache.unomi.api.query.NumericRange;
+import org.apache.unomi.api.services.ClusterService;
+import org.apache.unomi.persistence.elasticsearch.conditions.*;
+import org.apache.unomi.persistence.spi.CustomObjectMapper;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.apache.unomi.persistence.spi.aggregate.*;
+import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
+import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.support.nodes.NodesOperationRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.collect.UnmodifiableIterator;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.SettingsException;
+import org.elasticsearch.common.unit.DistanceUnit;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.indices.IndexMissingException;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.aggregations.*;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
+import org.elasticsearch.search.aggregations.bucket.filter.Filter;
+import org.elasticsearch.search.aggregations.bucket.global.Global;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
+import 
org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
+import org.elasticsearch.search.aggregations.bucket.missing.MissingBuilder;
+import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder;
+import 
org.elasticsearch.search.aggregations.bucket.range.date.DateRangeBuilder;
+import 
org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder;
+import 
org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
+import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.elasticsearch.search.sort.SortOrder;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.SynchronousBundleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+@SuppressWarnings("rawtypes")
+public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService, ClusterService, SynchronousBundleListener {
+
+    public static final long MILLIS_PER_DAY = 24L * 60L * 60L * 1000L;
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
+    private Node node;
+    private Client client;
+    private String clusterName;
+    private String indexName;
+    private String monthlyIndexNumberOfShards;
+    private String monthlyIndexNumberOfReplicas;
+    private String numberOfShards;
+    private String numberOfReplicas;
+    private Boolean nodeData;
+    private Boolean discoveryEnabled;
+    private String elasticSearchConfig = null;
+    private BundleContext bundleContext;
+    private Map<String, String> mappings = new HashMap<String, String>();
+    private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
+    private ConditionESQueryBuilderDispatcher 
conditionESQueryBuilderDispatcher;
+
+    private Map<String,String> indexNames;
+    private List<String> itemsMonthlyIndexed;
+    private Map<String, String> routingByType;
+
+    private String address;
+    private String port;
+    private String secureAddress;
+    private String securePort;
+
+    private Timer timer;
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public void setIndexName(String indexName) {
+        this.indexName = indexName;
+    }
+
+    public void setMonthlyIndexNumberOfShards(String 
monthlyIndexNumberOfShards) {
+        this.monthlyIndexNumberOfShards = monthlyIndexNumberOfShards;
+    }
+
+    public void setMonthlyIndexNumberOfReplicas(String 
monthlyIndexNumberOfReplicas) {
+        this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas;
+    }
+
+    public void setDiscoveryEnabled(Boolean discoveryEnabled) {
+        this.discoveryEnabled = discoveryEnabled;
+    }
+
+    public void setNumberOfShards(String numberOfShards) {
+        this.numberOfShards = numberOfShards;
+    }
+
+    public void setNumberOfReplicas(String numberOfReplicas) {
+        this.numberOfReplicas = numberOfReplicas;
+    }
+
+    public void setNodeData(Boolean nodeData) {
+        this.nodeData = nodeData;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public void setPort(String port) {
+        this.port = port;
+    }
+
+    public void setSecureAddress(String secureAddress) {
+        this.secureAddress = secureAddress;
+    }
+
+    public void setSecurePort(String securePort) {
+        this.securePort = securePort;
+    }
+
+    public void setItemsMonthlyIndexed(List<String> itemsMonthlyIndexed) {
+        this.itemsMonthlyIndexed = itemsMonthlyIndexed;
+    }
+
+    public void setIndexNames(Map<String, String> indexNames) {
+        this.indexNames = indexNames;
+    }
+
+    public void setRoutingByType(Map<String, String> routingByType) {
+        this.routingByType = routingByType;
+    }
+
+    public void setElasticSearchConfig(String elasticSearchConfig) {
+        this.elasticSearchConfig = elasticSearchConfig;
+    }
+
+    public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher 
conditionEvaluatorDispatcher) {
+        this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
+    }
+
+    public void 
setConditionESQueryBuilderDispatcher(ConditionESQueryBuilderDispatcher 
conditionESQueryBuilderDispatcher) {
+        this.conditionESQueryBuilderDispatcher = 
conditionESQueryBuilderDispatcher;
+    }
+
+    public void start() {
+
+        loadPredefinedMappings(bundleContext, false);
+
+        // on startup
+        new InClassLoaderExecute<Object>() {
+            public Object execute(Object... args) {
+                logger.info("Starting ElasticSearch persistence backend using 
cluster name " + clusterName + " and index name " + indexName + "...");
+                Map<String, String> settings = null;
+                if (elasticSearchConfig != null && 
elasticSearchConfig.length() > 0) {
+                    try {
+                        URL elasticSearchConfigURL = new 
URL(elasticSearchConfig);
+                        Settings.Builder settingsBuilder = 
ImmutableSettings.builder().loadFromUrl(elasticSearchConfigURL);
+                        settings = settingsBuilder.build().getAsMap();
+                        logger.info("Successfully loaded ElasticSearch 
configuration from " + elasticSearchConfigURL);
+                    } catch (MalformedURLException e) {
+                        logger.error("Error in ElasticSearch configuration URL 
", e);
+                    } catch (SettingsException se) {
+                        logger.info("Error trying to load settings from " + 
elasticSearchConfig + ": " + se.getMessage() + " (activate debug mode for 
exception details)");
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Exception details", se);
+                        }
+                    }
+                }
+
+                address = System.getProperty("contextserver.address", address);
+                port = System.getProperty("contextserver.port", port);
+                secureAddress = 
System.getProperty("contextserver.secureAddress", secureAddress);
+                securePort = System.getProperty("contextserver.securePort", 
securePort);
+
+                ImmutableSettings.Builder settingsBuilder = 
ImmutableSettings.builder();
+                if (settings != null) {
+                    settingsBuilder.put(settings);
+                }
+
+                settingsBuilder.put("cluster.name", clusterName)
+                        .put("node.data", nodeData)
+                        .put("discovery.zen.ping.multicast.enabled", 
discoveryEnabled)
+                        .put("index.number_of_replicas", numberOfReplicas)
+                        .put("index.number_of_shards", numberOfShards)
+                        .put("node.contextserver.address", address)
+                        .put("node.contextserver.port", port)
+                        .put("node.contextserver.secureAddress", secureAddress)
+                        .put("node.contextserver.securePort", securePort);
+
+                node = nodeBuilder().settings(settingsBuilder).node();
+                client = node.client();
+                // @todo is there a better way to detect index existence than 
to wait for it to startup ?
+                boolean indexExists = false;
+                int tries = 0;
+                while (!indexExists && tries < 20) {
+                    IndicesExistsResponse indicesExistsResponse = 
client.admin().indices().prepareExists(indexName).execute().actionGet();
+                    indexExists = indicesExistsResponse.isExists();
+                    tries++;
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        logger.error("Interrupted", e);
+                    }
+                }
+                if (!indexExists) {
+                    logger.info("{} index doesn't exist yet, creating it...", 
indexName);
+                    Map<String,String> indexMappings = new 
HashMap<String,String>();
+                    for (Map.Entry<String, String> entry : 
mappings.entrySet()) {
+                        if (!itemsMonthlyIndexed.contains(entry.getKey()) && 
!indexNames.containsKey(entry.getKey())) {
+                            indexMappings.put(entry.getKey(), 
entry.getValue());
+                        }
+                    }
+
+                    internalCreateIndex(indexName, indexMappings);
+                }
+
+                client.admin().indices().preparePutTemplate(indexName + 
"_monthlyindex")
+                        .setTemplate(indexName + "-*")
+                        .setOrder(1)
+                        .setSettings(ImmutableSettings.settingsBuilder()
+                                .put("number_of_shards", 
Integer.parseInt(monthlyIndexNumberOfShards))
+                                .put("number_of_replicas", 
Integer.parseInt(monthlyIndexNumberOfReplicas))
+                                .build()).execute().actionGet();
+
+                getMonthlyIndex(new Date(), true);
+
+                return null;
+            }
+        }.executeInClassLoader();
+
+
+        bundleContext.addBundleListener(this);
+
+        try {
+            for (ServiceReference<ConditionEvaluator> reference : 
bundleContext.getServiceReferences(ConditionEvaluator.class, null)) {
+                ConditionEvaluator service = 
bundleContext.getService(reference);
+                
conditionEvaluatorDispatcher.addEvaluator(reference.getProperty("conditionEvaluatorId").toString(),
 reference.getBundle().getBundleId(), service);
+            }
+            for (ServiceReference<ConditionESQueryBuilder> reference : 
bundleContext.getServiceReferences(ConditionESQueryBuilder.class, null)) {
+                ConditionESQueryBuilder service = 
bundleContext.getService(reference);
+                
conditionESQueryBuilderDispatcher.addQueryBuilder(reference.getProperty("queryBuilderId").toString(),
 reference.getBundle().getBundleId(), service);
+            }
+        } catch (Exception e) {
+            logger.error("Cannot get services", e);
+        }
+
+        timer = new Timer();
+
+        timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                GregorianCalendar gc = new GregorianCalendar();
+                int thisMonth = gc.get(Calendar.MONTH);
+                gc.add(Calendar.DAY_OF_MONTH, 1);
+                if (gc.get(Calendar.MONTH) != thisMonth) {
+                    getMonthlyIndex(gc.getTime(), true);
+                }
+            }
+        }, 10000L, 24L * 60L * 60L * 1000L);
+    }
+
+    public void stop() {
+
+        new InClassLoaderExecute<Object>() {
+            protected Object execute(Object... args) {
+                logger.info("Closing ElasticSearch persistence backend...");
+                node.close();
+                return null;
+            }
+        }.executeInClassLoader();
+
+        if (timer != null) {
+            timer.cancel();
+            timer = null;
+        }
+
+        bundleContext.removeBundleListener(this);
+    }
+
+    @Override
+    public void bundleChanged(BundleEvent event) {
+        switch (event.getType()) {
+            case BundleEvent.STARTED:
+                if (event.getBundle() != null && 
event.getBundle().getRegisteredServices() != null) {
+                    for (ServiceReference<?> reference : 
event.getBundle().getRegisteredServices()) {
+                        Object service = bundleContext.getService(reference);
+                        if (service instanceof ConditionEvaluator) {
+                            
conditionEvaluatorDispatcher.addEvaluator(reference.getProperty("conditionEvaluatorId").toString(),
 event.getBundle().getBundleId(), (ConditionEvaluator) service);
+                        }
+                        if (service instanceof ConditionESQueryBuilder) {
+                            
conditionESQueryBuilderDispatcher.addQueryBuilder(reference.getProperty("queryBuilderId").toString(),
 event.getBundle().getBundleId(), (ConditionESQueryBuilder) service);
+                        }
+                    }
+                }
+                loadPredefinedMappings(event.getBundle().getBundleContext(), 
true);
+                break;
+            case BundleEvent.STOPPING:
+                
conditionEvaluatorDispatcher.removeEvaluators(event.getBundle().getBundleId());
+                
conditionESQueryBuilderDispatcher.removeQueryBuilders(event.getBundle().getBundleId());
+                break;
+        }
+    }
+
+    private String getMonthlyIndex(Date date) {
+        return getMonthlyIndex(date, false);
+    }
+
+    private String getMonthlyIndex(Date date, boolean checkAndCreate) {
+        String d = new SimpleDateFormat("-YYYY-MM").format(date);
+        String monthlyIndexName = indexName + d;
+
+        if (checkAndCreate) {
+            IndicesExistsResponse indicesExistsResponse = 
client.admin().indices().prepareExists(monthlyIndexName).execute().actionGet();
+            boolean indexExists = indicesExistsResponse.isExists();
+            if (!indexExists) {
+                logger.info("{} index doesn't exist yet, creating it...", 
monthlyIndexName);
+
+                Map<String,String> indexMappings = new 
HashMap<String,String>();
+                for (Map.Entry<String, String> entry : mappings.entrySet()) {
+                    if (itemsMonthlyIndexed.contains(entry.getKey())) {
+                        indexMappings.put(entry.getKey(), entry.getValue());
+                    }
+                }
+
+                internalCreateIndex(monthlyIndexName, indexMappings);
+                logger.info("{} index created.", monthlyIndexName);
+            }
+        }
+        return monthlyIndexName;
+    }
+
+    private void loadPredefinedMappings(BundleContext bundleContext, boolean 
createMapping) {
+        Enumeration<URL> predefinedMappings = 
bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
+        if (predefinedMappings == null) {
+            return;
+        }
+        while (predefinedMappings.hasMoreElements()) {
+            URL predefinedMappingURL = predefinedMappings.nextElement();
+            logger.debug("Found mapping at " + predefinedMappingURL + ", 
loading... ");
+            try {
+                final String path = predefinedMappingURL.getPath();
+                String name = path.substring(path.lastIndexOf('/') + 1, 
path.lastIndexOf('.'));
+                BufferedReader reader = new BufferedReader(new 
InputStreamReader(predefinedMappingURL.openStream()));
+
+                StringBuilder content = new StringBuilder();
+                String l;
+                while ((l = reader.readLine()) != null) {
+                    content.append(l);
+                }
+                mappings.put(name, content.toString());
+                if (createMapping) {
+                    if (itemsMonthlyIndexed.contains(name)) {
+                        createMapping(name, content.toString(), indexName + 
"-*");
+                    } else if (indexNames.containsKey(name)) {
+                        createMapping(name, content.toString(), 
indexNames.get(name));
+                    } else {
+                        createMapping(name, content.toString(), indexName);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("Error while loading segment definition " + 
predefinedMappingURL, e);
+            }
+        }
+    }
+
+
+    @Override
+    public <T extends Item> List<T> getAllItems(final Class<T> clazz) {
+        return getAllItems(clazz, 0, -1, null).getList();
+    }
+
+    @Override
+    public long getAllItemsCount(String itemType) {
+        return queryCount(FilterBuilders.matchAllFilter(), itemType);
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, 
int offset, int size, String sortBy) {
+        return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, 
size, null);
+    }
+
+    @Override
+    public <T extends Item> T load(final String itemId, final Class<T> clazz) {
+        return load(itemId, null, clazz);
+    }
+
+    @Override
+    public <T extends Item> T load(final String itemId, final Date dateHint, 
final Class<T> clazz) {
+        return new InClassLoaderExecute<T>() {
+            protected T execute(Object... args) {
+                try {
+                    String itemType = (String) 
clazz.getField("ITEM_TYPE").get(null);
+
+                    if (itemsMonthlyIndexed.contains(itemType) && dateHint == 
null) {
+                        PartialList<T> r = 
query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null);
+                        if (r.size() > 0) {
+                            return r.get(0);
+                        }
+                        return null;
+                    } else {
+                        String index = indexNames.containsKey(itemType) ? 
indexNames.get(itemType) :
+                                (itemsMonthlyIndexed.contains(itemType) ? 
getMonthlyIndex(dateHint) : indexName);
+
+                        GetResponse response = client.prepareGet(index, 
itemType, itemId)
+                                .execute()
+                                .actionGet();
+                        if (response.isExists()) {
+                            String sourceAsString = 
response.getSourceAsString();
+                            final T value = 
CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
+                            value.setItemId(response.getId());
+                            return value;
+                        } else {
+                            return null;
+                        }
+                    }
+                } catch (IndexMissingException e) {
+                    logger.debug("No index found for itemType=" + 
clazz.getName() + "itemId=" + itemId, e);
+                } catch (IllegalAccessException e) {
+                    logger.error("Error loading itemType=" + clazz.getName() + 
"itemId=" + itemId, e);
+                } catch (Exception t) {
+                    logger.error("Error loading itemType=" + clazz.getName() + 
"itemId=" + itemId, t);
+                }
+                return null;
+            }
+        }.executeInClassLoader();
+
+    }
+
+    @Override
+    public boolean save(final Item item) {
+
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                try {
+                    String source = 
CustomObjectMapper.getObjectMapper().writeValueAsString(item);
+                    String itemType = item.getItemType();
+                    String index = indexNames.containsKey(itemType) ? 
indexNames.get(itemType) :
+                            (itemsMonthlyIndexed.contains(itemType) ? 
getMonthlyIndex(((TimestampedItem) item).getTimeStamp()) : indexName);
+                    IndexRequestBuilder indexBuilder = 
client.prepareIndex(index, itemType, item.getItemId())
+                            .setSource(source);
+                    if (routingByType.containsKey(itemType)) {
+                        indexBuilder = 
indexBuilder.setRouting(routingByType.get(itemType));
+                    }
+                    try {
+                        indexBuilder.execute().actionGet();
+                    } catch (IndexMissingException e) {
+                        if (itemsMonthlyIndexed.contains(itemType)) {
+                            getMonthlyIndex(((TimestampedItem) 
item).getTimeStamp(), true);
+                            indexBuilder.execute().actionGet();
+                        }
+                    }
+                    return true;
+                } catch (IOException e) {
+                    logger.error("Error saving item " + item, e);
+                }
+                return false;
+            }
+        }.executeInClassLoader();
+
+    }
+
+    @Override
+    public boolean update(final String itemId, final Date dateHint, final 
Class clazz, final String propertyName, final Object propertyValue) {
+        return update(itemId, dateHint, clazz, 
Collections.singletonMap(propertyName, propertyValue));
+    }
+
+    @Override
+    public boolean update(final String itemId, final Date dateHint, final 
Class clazz, final Map source) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                try {
+                    String itemType = (String) 
clazz.getField("ITEM_TYPE").get(null);
+
+                    String index = indexNames.containsKey(itemType) ? 
indexNames.get(itemType) :
+                            (itemsMonthlyIndexed.contains(itemType) && 
dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+
+                    client.prepareUpdate(index, itemType, 
itemId).setDoc(source)
+                            .execute()
+                            .actionGet();
+                    return true;
+                } catch (IndexMissingException e) {
+                    logger.debug("No index found for itemType=" + 
clazz.getName() + "itemId=" + itemId, e);
+                } catch (NoSuchFieldException e) {
+                    logger.error("Error updating item " + itemId, e);
+                } catch (IllegalAccessException e) {
+                    logger.error("Error updating item " + itemId, e);
+                }
+                return false;
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public boolean update(final String itemId, final Date dateHint, final 
Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                try {
+                    String itemType = (String) 
clazz.getField("ITEM_TYPE").get(null);
+
+                    String index = indexNames.containsKey(itemType) ? 
indexNames.get(itemType) :
+                            (itemsMonthlyIndexed.contains(itemType) && 
dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+
+                    client.prepareUpdate(index, itemType, 
itemId).setScript(script, 
ScriptService.ScriptType.INLINE).setScriptParams(scriptParams)
+                            .execute()
+                            .actionGet();
+                    return true;
+                } catch (IndexMissingException e) {
+                    logger.debug("No index found for itemType=" + 
clazz.getName() + "itemId=" + itemId, e);
+                } catch (NoSuchFieldException e) {
+                    logger.error("Error updating item " + itemId, e);
+                } catch (IllegalAccessException e) {
+                    logger.error("Error updating item " + itemId, e);
+                }
+                return false;
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public <T extends Item> boolean remove(final String itemId, final Class<T> 
clazz) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                //Index the query = register it in the percolator
+                try {
+                    String itemType = (String) 
clazz.getField("ITEM_TYPE").get(null);
+
+                    client.prepareDelete(getIndexNameForQuery(itemType), 
itemType, itemId)
+                            .execute().actionGet();
+                    return true;
+                } catch (Exception e) {
+                    logger.error("Cannot remove", e);
+                }
+                return false;
+            }
+        }.executeInClassLoader();
+    }
+
+    public <T extends Item> boolean removeByQuery(final Condition query, final 
Class<T> clazz) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                try {
+                    String itemType = (String) 
clazz.getField("ITEM_TYPE").get(null);
+
+                    client.prepareDeleteByQuery(getIndexNameForQuery(itemType))
+                            
.setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
+                            .execute().actionGet();
+                    return true;
+                } catch (Exception e) {
+                    logger.error("Cannot remove by query", e);
+                }
+                return false;
+            }
+        }.executeInClassLoader();
+    }
+
+    public boolean createIndex(final String indexName) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                IndicesExistsResponse indicesExistsResponse = 
client.admin().indices().prepareExists(indexName).execute().actionGet();
+                boolean indexExists = indicesExistsResponse.isExists();
+                if (!indexExists) {
+                    Map<String,String> indexMappings = new 
HashMap<String,String>();
+                    for (Map.Entry<String, String> entry : 
mappings.entrySet()) {
+                        if (indexNames.containsKey(entry.getKey()) && 
indexNames.get(entry.getKey()).equals(indexName)) {
+                            indexMappings.put(entry.getKey(), 
entry.getValue());
+                        }
+                    }
+                    internalCreateIndex(indexName, indexMappings);
+                }
+                return !indexExists;
+            }
+        }.executeInClassLoader();
+    }
+
+    public boolean removeIndex(final String indexName) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                IndicesExistsResponse indicesExistsResponse = 
client.admin().indices().prepareExists(indexName).execute().actionGet();
+                boolean indexExists = indicesExistsResponse.isExists();
+                if (indexExists) {
+                    
client.admin().indices().prepareDelete(indexName).execute().actionGet();
+                }
+                return indexExists;
+            }
+        }.executeInClassLoader();
+    }
+
+    private void internalCreateIndex(String indexName, Map<String,String> 
mappings) {
+        CreateIndexRequestBuilder builder = 
client.admin().indices().prepareCreate(indexName)
+                .setSettings("{\n" +
+                        "    \"analysis\": {\n" +
+                        "      \"tokenizer\": {\n" +
+                        "        \"myTokenizer\": {\n" +
+                        "          \"type\":\"pattern\",\n" +
+                        "          \"pattern\":\".*\",\n" +
+                        "          \"group\":0\n" +
+                        "        }\n" +
+                        "      },\n" +
+                        "      \"analyzer\": {\n" +
+                        "        \"folding\": {\n" +
+                        "          \"type\":\"custom\",\n" +
+                        "          \"tokenizer\": \"myTokenizer\",\n" +
+                        "          \"filter\":  [ \"lowercase\", 
\"asciifolding\" ]\n" +
+                        "        }\n" +
+                        "      }\n" +
+                        "    }\n" +
+                        "}\n");
+
+        for (Map.Entry<String, String> entry : mappings.entrySet()) {
+            builder.addMapping(entry.getKey(), entry.getValue());
+        }
+
+        builder.execute().actionGet();
+    }
+
+
+    private boolean createMapping(final String type, final String source, 
final String indexName) {
+        client.admin().indices()
+                .preparePutMapping(indexName)
+                .setType(type)
+                .setSource(source)
+                .execute().actionGet();
+        return true;
+    }
+
+    @Override
+    public Map<String, Map<String, Object>> getMapping(final String itemType) {
+        return new InClassLoaderExecute<Map<String, Map<String, Object>>>() {
+            @SuppressWarnings("unchecked")
+            protected Map<String, Map<String, Object>> execute(Object... args) 
{
+                GetMappingsResponse getMappingsResponse = 
client.admin().indices().prepareGetMappings().setTypes(itemType).execute().actionGet();
+                ImmutableOpenMap<String, ImmutableOpenMap<String, 
MappingMetaData>> mappings = getMappingsResponse.getMappings();
+                Map<String, Map<String, Object>> propertyMap = new HashMap<>();
+                try {
+                    UnmodifiableIterator<ImmutableOpenMap<String, 
MappingMetaData>> it = mappings.valuesIt();
+                    while (it.hasNext()) {
+                        ImmutableOpenMap<String, MappingMetaData> next = 
it.next();
+                        Map<String, Map<String, Object>> properties = 
(Map<String, Map<String, Object>>) 
next.get(itemType).getSourceAsMap().get("properties");
+                        for (Map.Entry<String, Map<String, Object>> entry : 
properties.entrySet()) {
+                            if (propertyMap.containsKey(entry.getKey())) {
+                                Map<String, Object> subPropMap = 
propertyMap.get(entry.getKey());
+                                for (Map.Entry<String, Object> subentry : 
entry.getValue().entrySet()) {
+                                    if 
(subPropMap.containsKey(subentry.getKey()) && subPropMap.get(subentry.getKey()) 
instanceof Map && subentry.getValue() instanceof Map) {
+                                        ((Map) 
subPropMap.get(subentry.getKey())).putAll((Map) subentry.getValue());
+                                    } else {
+                                        subPropMap.put(subentry.getKey(), 
subentry.getValue());
+                                    }
+                                }
+                            } else {
+                                propertyMap.put(entry.getKey(), 
entry.getValue());
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    logger.error("Cannot get mapping", e);
+                }
+                return propertyMap;
+            }
+        }.executeInClassLoader();
+    }
+
+    public boolean saveQuery(final String queryName, final String query) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                //Index the query = register it in the percolator
+                try {
+                    logger.info("Saving query : " + queryName);
+                    client.prepareIndex(indexName, ".percolator", queryName)
+                            .setSource(query)
+                            .setRefresh(true) // Needed when the query shall 
be available immediately
+                            .execute().actionGet();
+                    return true;
+                } catch (Exception e) {
+                    logger.error("Cannot save query", e);
+                }
+                return false;
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public boolean saveQuery(String queryName, Condition query) {
+        if (query == null) {
+            return false;
+        }
+        saveQuery(queryName, 
conditionESQueryBuilderDispatcher.getQuery(query));
+        return true;
+    }
+
+    @Override
+    public boolean removeQuery(final String queryName) {
+        return new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                //Index the query = register it in the percolator
+                try {
+                    client.prepareDelete(indexName, ".percolator", queryName)
+                            .setRefresh(true) // Needed when the query shall 
be available immediately
+                            .execute().actionGet();
+                    return true;
+                } catch (Exception e) {
+                    logger.error("Cannot delete query", e);
+                }
+                return false;
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public List<String> getMatchingSavedQueries(final Item item) {
+        return new InClassLoaderExecute<List<String>>() {
+            protected List<String> execute(Object... args) {
+                List<String> matchingQueries = new ArrayList<String>();
+                try {
+                    String source = 
CustomObjectMapper.getObjectMapper().writeValueAsString(item);
+
+                    String itemType = item.getItemType();
+
+                    //Percolate
+                    PercolateResponse response = client.preparePercolate()
+                            .setIndices(indexName)
+                            .setDocumentType(itemType)
+                            .setSource("{doc:" + source + 
"}").execute().actionGet();
+                    //Iterate over the results
+                    for (PercolateResponse.Match match : response) {
+                        //Handle the result which is the name of
+                        //the query in the percolator
+                        matchingQueries.add(match.getId().string());
+                    }
+                } catch (IOException e) {
+                    logger.error("Error getting matching saved queries for 
item=" + item, e);
+                }
+                return matchingQueries;
+            }
+        }.executeInClassLoader();
+
+    }
+
+    @Override
+    public boolean testMatch(Condition query, Item item) {
+        try {
+            return conditionEvaluatorDispatcher.eval(query, item);
+        } catch (UnsupportedOperationException e) {
+            logger.error("Eval not supported, continue with query", e);
+        }
+        try {
+            final Class<? extends Item> clazz = item.getClass();
+            String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
+
+            FilterBuilder builder = FilterBuilders.andFilter(
+                    FilterBuilders.idsFilter(itemType).ids(item.getItemId()),
+                    conditionESQueryBuilderDispatcher.buildFilter(query));
+            return queryCount(builder, itemType) > 0;
+        } catch (IllegalAccessException e) {
+            logger.error("Error getting query for item=" + item, e);
+        } catch (NoSuchFieldException e) {
+            logger.error("Error getting query for item=" + item, e);
+        }
+        return false;
+    }
+
+    @Override
+    public <T extends Item> List<T> query(final Condition query, String 
sortBy, final Class<T> clazz) {
+        return query(query, sortBy, clazz, 0, -1).getList();
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> query(final Condition query, String 
sortBy, final Class<T> clazz, final int offset, final int size) {
+        return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), 
sortBy, clazz, offset, size, null);
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> queryFullText(final String 
fulltext, final Condition query, String sortBy, final Class<T> clazz, final int 
offset, final int size) {
+        return 
query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)),
 sortBy, clazz, offset, size, null);
+    }
+
+    @Override
+    public <T extends Item> List<T> query(final String fieldName, final String 
fieldValue, String sortBy, final Class<T> clazz) {
+        return query(fieldName, fieldValue, sortBy, clazz, 0, -1).getList();
+    }
+
+    @Override
+    public <T extends Item> List<T> query(final String fieldName, final 
String[] fieldValues, String sortBy, final Class<T> clazz) {
+        return query(QueryBuilders.termsQuery(fieldName, 
ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, 
getRouting(fieldName, fieldValues, clazz)).getList();
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> query(String fieldName, String 
fieldValue, String sortBy, Class<T> clazz, int offset, int size) {
+        return query(QueryBuilders.termQuery(fieldName, 
ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, 
getRouting(fieldName, new String[]{fieldValue}, clazz));
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> queryFullText(String fieldName, 
String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, 
int size) {
+        return 
query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName,
 fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new 
String[]{fieldValue}, clazz));
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> queryFullText(String fulltext, 
String sortBy, Class<T> clazz, int offset, int size) {
+        return 
query(QueryBuilders.queryStringQuery(fulltext).defaultField("_all"), sortBy, 
clazz, offset, size, getRouting("_all", new String[]{fulltext}, clazz));
+    }
+
+    @Override
+    public <T extends Item> PartialList<T> rangeQuery(String fieldName, String 
from, String to, String sortBy, Class<T> clazz, int offset, int size) {
+        RangeQueryBuilder builder = QueryBuilders.rangeQuery(fieldName);
+        builder.from(from);
+        builder.to(to);
+        return query(builder, sortBy, clazz, offset, size, null);
+    }
+
+    @Override
+    public long queryCount(Condition query, String itemType) {
+        return 
queryCount(conditionESQueryBuilderDispatcher.buildFilter(query), itemType);
+    }
+
+    private long queryCount(final FilterBuilder filter, final String itemType) 
{
+        return new InClassLoaderExecute<Long>() {
+
+            @Override
+            protected Long execute(Object... args) {
+                SearchResponse response = 
client.prepareSearch(getIndexNameForQuery(itemType))
+                        .setTypes(itemType)
+                        .setSearchType(SearchType.COUNT)
+                        .setQuery(QueryBuilders.matchAllQuery())
+                        
.addAggregation(AggregationBuilders.filter("filter").filter(filter))
+                        .execute()
+                        .actionGet();
+                Aggregations searchHits = response.getAggregations();
+                Filter filter = searchHits.get("filter");
+                return filter.getDocCount();
+            }
+        }.executeInClassLoader();
+    }
+
+    private <T extends Item> PartialList<T> query(final QueryBuilder query, 
final String sortBy, final Class<T> clazz, final int offset, final int size, 
final String[] routing) {
+        return new InClassLoaderExecute<PartialList<T>>() {
+
+            @Override
+            protected PartialList<T> execute(Object... args) {
+                List<T> results = new ArrayList<T>();
+                long totalHits = 0;
+                try {
+                    String itemType = getItemType(clazz);
+
+                    SearchRequestBuilder requestBuilder = 
client.prepareSearch(getIndexNameForQuery(itemType))
+                            .setTypes(itemType)
+                            .setFetchSource(true)
+                            .setQuery(query)
+                            .setFrom(offset);
+                    if (size != -1) {
+                        requestBuilder.setSize(size);
+                    } else {
+                        requestBuilder.setSize(Integer.MAX_VALUE);
+                    }
+                    if (routing != null) {
+                        requestBuilder.setRouting(routing);
+                    }
+                    if (sortBy != null) {
+                        String[] sortByArray = sortBy.split(",");
+                        for (String sortByElement : sortByArray) {
+                            if (sortByElement.startsWith("geo:")) {
+                                String[] elements = sortByElement.split(":");
+                                GeoDistanceSortBuilder distanceSortBuilder = 
SortBuilders.geoDistanceSort(elements[1]).point(Double.parseDouble(elements[2]),
 Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS);
+                                if (elements.length > 4 && 
elements[4].equals("desc")) {
+                                    requestBuilder = 
requestBuilder.addSort(distanceSortBuilder.order(SortOrder.DESC));
+                                } else {
+                                    requestBuilder = 
requestBuilder.addSort(distanceSortBuilder.order(SortOrder.ASC));
+                                }
+                            } else {
+                                if (sortByElement.endsWith(":desc")) {
+                                    requestBuilder = 
requestBuilder.addSort(sortByElement.substring(0, sortByElement.length() - 
":desc".length()), SortOrder.DESC);
+                                } else if (sortByElement.endsWith(":asc")) {
+                                    requestBuilder = 
requestBuilder.addSort(sortByElement.substring(0, sortByElement.length() - 
":asc".length()), SortOrder.ASC);
+                                } else {
+                                    requestBuilder = 
requestBuilder.addSort(sortByElement, SortOrder.ASC);
+                                }
+                            }
+                        }
+                    }
+                    SearchResponse response = requestBuilder
+                            .execute()
+                            .actionGet();
+                    SearchHits searchHits = response.getHits();
+                    totalHits = searchHits.getTotalHits();
+                    for (SearchHit searchHit : searchHits) {
+                        String sourceAsString = searchHit.getSourceAsString();
+                        final T value = 
CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
+                        value.setItemId(searchHit.getId());
+                        results.add(value);
+                    }
+                } catch (Exception t) {
+                    logger.error("Error loading itemType=" + clazz.getName() + 
"query=" + query, t);
+                }
+
+                return new PartialList<T>(results, offset, size, totalHits);
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public Map<String, Long> aggregateQuery(final Condition filter, final 
BaseAggregate aggregate, final String itemType) {
+        return new InClassLoaderExecute<Map<String, Long>>() {
+
+            @Override
+            protected Map<String, Long> execute(Object... args) {
+                Map<String, Long> results = new LinkedHashMap<String, Long>();
+
+                SearchRequestBuilder builder = 
client.prepareSearch(getIndexNameForQuery(itemType))
+                        .setTypes(itemType)
+                        .setSearchType(SearchType.COUNT)
+                        .setQuery(QueryBuilders.matchAllQuery());
+
+                List<AggregationBuilder> lastAggregation = new 
ArrayList<AggregationBuilder>();
+
+                if (aggregate != null) {
+                    AggregationBuilder bucketsAggregation = null;
+                    if (aggregate instanceof DateAggregate) {
+                        DateAggregate dateAggregate = (DateAggregate) 
aggregate;
+                        DateHistogramBuilder dateHistogramBuilder = 
AggregationBuilders.dateHistogram("buckets").field(aggregate.getField()).interval(new
 DateHistogram.Interval((dateAggregate.getInterval())));
+                        if (dateAggregate.getFormat() != null) {
+                            
dateHistogramBuilder.format(dateAggregate.getFormat());
+                        }
+                        bucketsAggregation = dateHistogramBuilder;
+                    } else if (aggregate instanceof NumericRangeAggregate) {
+                        RangeBuilder rangebuilder = 
AggregationBuilders.range("buckets").field(aggregate.getField());
+                        for (NumericRange range : ((NumericRangeAggregate) 
aggregate).getRanges()) {
+                            if (range != null) {
+                                if (range.getFrom() != null && range.getTo() 
!= null) {
+                                    rangebuilder.addRange(range.getKey(), 
range.getFrom(), range.getTo());
+                                } else if (range.getFrom() != null) {
+                                    
rangebuilder.addUnboundedFrom(range.getKey(), range.getFrom());
+                                } else if (range.getTo() != null) {
+                                    
rangebuilder.addUnboundedTo(range.getKey(), range.getTo());
+                                }
+                            }
+                        }
+                        bucketsAggregation = rangebuilder;
+                    } else if (aggregate instanceof DateRangeAggregate) {
+                        DateRangeAggregate dateRangeAggregate = 
(DateRangeAggregate) aggregate;
+                        DateRangeBuilder rangebuilder = 
AggregationBuilders.dateRange("buckets").field(aggregate.getField());
+                        if (dateRangeAggregate.getFormat() != null) {
+                            
rangebuilder.format(dateRangeAggregate.getFormat());
+                        }
+                        for (DateRange range : 
dateRangeAggregate.getDateRanges()) {
+                            if (range != null) {
+                                rangebuilder.addRange(range.getKey(), 
range.getFrom(), range.getTo());
+                            }
+                        }
+                        bucketsAggregation = rangebuilder;
+                    } else if (aggregate instanceof IpRangeAggregate) {
+                        IpRangeAggregate ipRangeAggregate = (IpRangeAggregate) 
aggregate;
+                        IPv4RangeBuilder rangebuilder = 
AggregationBuilders.ipRange("buckets").field(aggregate.getField());
+                        for (IpRange range : ipRangeAggregate.getRanges()) {
+                            if (range != null) {
+                                rangebuilder.addRange(range.getKey(), 
range.getFrom(), range.getTo());
+                            }
+                        }
+                        bucketsAggregation = rangebuilder;
+                    } else {
+                        //default
+                        bucketsAggregation = 
AggregationBuilders.terms("buckets").field(aggregate.getField()).size(Integer.MAX_VALUE);
+                    }
+                    if (bucketsAggregation != null) {
+                        final MissingBuilder missingBucketsAggregation = 
AggregationBuilders.missing("missing").field(aggregate.getField());
+                        for (AggregationBuilder aggregationBuilder : 
lastAggregation) {
+                            
bucketsAggregation.subAggregation(aggregationBuilder);
+                            
missingBucketsAggregation.subAggregation(aggregationBuilder);
+                        }
+                        lastAggregation = Arrays.asList(bucketsAggregation, 
missingBucketsAggregation);
+                    }
+                }
+
+                if (filter != null) {
+                    AggregationBuilder filterAggregation = 
AggregationBuilders.filter("filter").filter(conditionESQueryBuilderDispatcher.buildFilter(filter));
+                    for (AggregationBuilder aggregationBuilder : 
lastAggregation) {
+                        filterAggregation.subAggregation(aggregationBuilder);
+                    }
+                    lastAggregation = 
Collections.singletonList(filterAggregation);
+                }
+
+
+                AggregationBuilder globalAggregation = 
AggregationBuilders.global("global");
+                for (AggregationBuilder aggregationBuilder : lastAggregation) {
+                    globalAggregation.subAggregation(aggregationBuilder);
+                }
+
+                builder.addAggregation(globalAggregation);
+
+                SearchResponse response = builder.execute().actionGet();
+
+                Aggregations aggregations = response.getAggregations();
+                if (aggregations != null) {
+                    Global globalAgg = aggregations.get("global");
+                    results.put("_all", globalAgg.getDocCount());
+                    aggregations = globalAgg.getAggregations();
+
+                    if (aggregations.get("filter") != null) {
+                        Filter filterAgg = aggregations.get("filter");
+                        results.put("_filtered", filterAgg.getDocCount());
+                        aggregations = filterAgg.getAggregations();
+                    }
+                    if (aggregations.get("buckets") != null) {
+                        MultiBucketsAggregation terms = 
aggregations.get("buckets");
+                        for (MultiBucketsAggregation.Bucket bucket : 
terms.getBuckets()) {
+                            results.put(bucket.getKey(), bucket.getDocCount());
+                        }
+                        SingleBucketAggregation missing = 
aggregations.get("missing");
+                        if (missing.getDocCount() > 0) {
+                            results.put("_missing", missing.getDocCount());
+                        }
+                    }
+                }
+
+                return results;
+            }
+        }.executeInClassLoader();
+    }
+
+    private <T extends Item> String getItemType(Class<T> clazz) {
+        try {
+            return (String) clazz.getField("ITEM_TYPE").get(null);
+        } catch (NoSuchFieldException e) {
+            logger.error("Class " + clazz.getName() + " doesn't define a 
publicly accessible ITEM_TYPE field", e);
+        } catch (IllegalAccessException e) {
+            logger.error("Error loading itemType=" + clazz.getName(), e);
+        }
+        return null;
+    }
+
+    private <T extends Item> String[] getRouting(String fieldName, String[] 
fieldValues, Class<T> clazz) {
+        String itemType = getItemType(clazz);
+        String[] routing = null;
+        if (routingByType.containsKey(itemType) && 
routingByType.get(itemType).equals(fieldName)) {
+            routing = fieldValues;
+        }
+        return routing;
+    }
+
+
+    @Override
+    public List<ClusterNode> getClusterNodes() {
+        return new InClassLoaderExecute<List<ClusterNode>>() {
+
+            @Override
+            protected List<ClusterNode> execute(Object... args) {
+                Map<String, ClusterNode> clusterNodes = new 
LinkedHashMap<String, ClusterNode>();
+
+                NodesInfoResponse nodesInfoResponse = 
client.admin().cluster().prepareNodesInfo(NodesOperationRequest.ALL_NODES)
+                        .setSettings(true)
+                        .execute()
+                        .actionGet();
+                NodeInfo[] nodesInfoArray = nodesInfoResponse.getNodes();
+                for (NodeInfo nodeInfo : nodesInfoArray) {
+                    if 
(nodeInfo.getSettings().get("node.contextserver.address") != null) {
+                        ClusterNode clusterNode = new ClusterNode();
+                        clusterNode.setHostName(nodeInfo.getHostname());
+                        
clusterNode.setHostAddress(nodeInfo.getSettings().get("node.contextserver.address"));
+                        
clusterNode.setPublicPort(Integer.parseInt(nodeInfo.getSettings().get("node.contextserver.port")));
+                        
clusterNode.setSecureHostAddress(nodeInfo.getSettings().get("node.contextserver.secureAddress"));
+                        
clusterNode.setSecurePort(Integer.parseInt(nodeInfo.getSettings().get("node.contextserver.securePort")));
+                        
clusterNode.setMaster(nodeInfo.getNode().isMasterNode());
+                        clusterNode.setData(nodeInfo.getNode().isDataNode());
+                        clusterNodes.put(nodeInfo.getNode().getId(), 
clusterNode);
+                    }
+                }
+
+                NodesStatsResponse nodesStatsResponse = 
client.admin().cluster().prepareNodesStats(NodesOperationRequest.ALL_NODES)
+                        .setOs(true)
+                        .setProcess(true)
+                        .execute()
+                        .actionGet();
+                NodeStats[] nodeStatsArray = nodesStatsResponse.getNodes();
+                for (NodeStats nodeStats : nodeStatsArray) {
+                    ClusterNode clusterNode = 
clusterNodes.get(nodeStats.getNode().getId());
+                    if (clusterNode != null) {
+                        // the following may be null in the case where Sigar 
didn't initialize properly, for example
+                        // because the native libraries were not installed or 
if we redeployed the OSGi bundle in which
+                        // case Sigar cannot initialize properly since it 
tries to reload the native libraries, generates
+                        // an error and doesn't initialize properly.
+                        if (nodeStats.getProcess() != null && 
nodeStats.getProcess().getCpu() != null) {
+                            
clusterNode.setCpuLoad(nodeStats.getProcess().getCpu().getPercent());
+                        }
+                        if (nodeStats.getOs() != null) {
+                            
clusterNode.setLoadAverage(nodeStats.getOs().getLoadAverage());
+                            
clusterNode.setUptime(nodeStats.getOs().getUptime().getMillis());
+                        }
+                    }
+                }
+
+                return new ArrayList<ClusterNode>(clusterNodes.values());
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public void refresh() {
+        new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                
client.admin().indices().refresh(Requests.refreshRequest()).actionGet();
+                return true;
+            }
+        }.executeInClassLoader();
+
+    }
+
+
+    @Override
+    public void purge(final Date date) {
+        new InClassLoaderExecute<Object>() {
+            @Override
+            protected Object execute(Object... args) {
+                IndicesStatsResponse statsResponse = 
client.admin().indices().prepareStats(indexName + "-*")
+                        .setIndexing(false)
+                        .setGet(false)
+                        .setSearch(false)
+                        .setWarmer(false)
+                        .setMerge(false)
+                        .setFieldData(false)
+                        .setFlush(false)
+                        .setCompletion(false)
+                        .setRefresh(false)
+                        .setSuggest(false)
+                        .execute()
+                        .actionGet();
+
+                SimpleDateFormat d = new SimpleDateFormat("yyyy-MM");
+
+                List<String> toDelete = new ArrayList<String>();
+                for (String currentIndexName : 
statsResponse.getIndices().keySet()) {
+                    if (currentIndexName.startsWith(indexName + "-")) {
+                        try {
+                            Date indexDate = 
d.parse(currentIndexName.substring(indexName.length() + 1));
+
+                            if (indexDate.before(date)) {
+                                toDelete.add(currentIndexName);
+                            }
+                        } catch (ParseException e) {
+                            logger.error("Cannot parse index name " + 
currentIndexName, e);
+                        }
+                    }
+                }
+                if (!toDelete.isEmpty()) {
+                    
client.admin().indices().prepareDelete(toDelete.toArray(new 
String[toDelete.size()])).execute().actionGet();
+                }
+                return null;
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public void purge(final String scope) {
+        new InClassLoaderExecute<Void>() {
+            @Override
+            protected Void execute(Object... args) {
+                QueryBuilder query = QueryBuilders.termQuery("scope", 
ConditionContextHelper.foldToASCII(scope));
+
+                BulkRequestBuilder deleteByScope = client.prepareBulk();
+
+                final TimeValue keepAlive = TimeValue.timeValueHours(1);
+                SearchResponse response = client.prepareSearch(indexName + "*")
+                        .setSearchType(SearchType.SCAN)
+                        .setScroll(keepAlive)
+                        .setQuery(query)
+                        .setSize(100).execute().actionGet();
+
+                // Scroll until no more hits are returned
+                while (true) {
+
+                    for (SearchHit hit : response.getHits().getHits()) {
+                        // add hit to bulk delete
+                        
deleteByScope.add(Requests.deleteRequest(hit.index()).type(hit.type()).id(hit.id()));
+                    }
+
+                    response = 
client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
+
+                    // If we have no more hits, exit
+                    if (response.getHits().getHits().length == 0) {
+                        break;
+                    }
+                }
+
+                // we're done with the scrolling, delete now
+                if (deleteByScope.numberOfActions() > 0) {
+                    final BulkResponse deleteResponse = deleteByScope.get();
+                    if (deleteResponse.hasFailures()) {
+                        // do something
+                        logger.debug("Couldn't delete from scope " + scope + 
":\n{}", deleteResponse.buildFailureMessage());
+                    }
+                }
+
+                return null;
+            }
+        }.executeInClassLoader();
+    }
+
+    @Override
+    public Map<String, Double> getSingleValuesMetrics(final Condition 
condition, final String[] metrics, final String field, final String itemType) {
+        return new InClassLoaderExecute<Map<String, Double>>() {
+
+            @Override
+            protected Map<String, Double> execute(Object... args) {
+                Map<String, Double> results = new LinkedHashMap<String, 
Double>();
+
+                SearchRequestBuilder builder = 
client.prepareSearch(getIndexNameForQuery(itemType))
+                        .setTypes(itemType)
+                        .setSearchType(SearchType.COUNT)
+                        .setQuery(QueryBuilders.matchAllQuery());
+                AggregationBuilder filterAggregation = 
AggregationBuilders.filter("metrics").filter(conditionESQueryBuilderDispatcher.buildFilter(condition));
+
+                if (metrics != null) {
+                    for (String metric : metrics) {
+                        switch (metric) {
+                            case "sum":
+                                
filterAggregation.subAggregation(AggregationBuilders.sum("sum").field(field));
+                                break;
+                            case "avg":
+                                
filterAggregation.subAggregation(AggregationBuilders.avg("avg").field(field));
+                                break;
+                            case "min":
+                                
filterAggregation.subAggregation(AggregationBuilders.min("min").field(field));
+                                break;
+                            case "max":
+                                
filterAggregation.subAggregation(AggregationBuilders.max("max").field(field));
+                                break;
+                        }
+                    }
+                }
+                builder.addAggregation(filterAggregation);
+                SearchResponse response = builder.execute().actionGet();
+
+                Aggregations aggregations = response.getAggregations();
+                if (aggregations != null) {
+                    Aggregation metricsResults = aggregations.get("metrics");
+                    if (metricsResults instanceof HasAggregations) {
+                        aggregations = ((HasAggregations) 
metricsResults).getAggregations();
+                        for (Aggregation aggregation : aggregations) {
+                            InternalNumericMetricsAggregation.SingleValue 
singleValue = (InternalNumericMetricsAggregation.SingleValue) aggregation;
+                            results.put("_" + singleValue.getName(), 
singleValue.value());
+                        }
+                    }
+                }
+                return results;
+            }
+        }.executeInClassLoader();
+    }
+
+    private String getIndexNameForQuery(String itemType) {
+        return indexNames.containsKey(itemType) ? indexNames.get(itemType) :
+                (itemsMonthlyIndexed.contains(itemType) ? indexName + "-*" : 
indexName);
+    }
+
+    public abstract static class InClassLoaderExecute<T> {
+
+        protected abstract T execute(Object... args);
+
+        public T executeInClassLoader(Object... args) {
+            ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+            try {
+                
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                return execute(args);
+            } finally {
+                Thread.currentThread().setContextClassLoader(tccl);
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionContextHelper.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionContextHelper.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionContextHelper.java
new file mode 100644
index 0000000..3666d8d
--- /dev/null
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionContextHelper.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.persistence.elasticsearch.conditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.unomi.api.conditions.Condition;
+import org.elasticsearch.common.base.Function;
+import org.elasticsearch.common.collect.Lists;
+import org.mvel2.MVEL;
+import org.mvel2.ParserConfiguration;
+import org.mvel2.ParserContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ConditionContextHelper {
+    private static Map<String,Serializable> mvelExpressions = new 
ConcurrentHashMap<>();
+
+    public static Condition getContextualCondition(Condition condition, 
Map<String, Object> context) {
+        if (context.isEmpty() || 
!hasContextualParameter(condition.getParameterValues())) {
+            return condition;
+        }
+        @SuppressWarnings("unchecked")
+        Map<String, Object> values = (Map<String, Object>) 
parseParameter(context, condition.getParameterValues());
+        if (values == null) {
+            return null;
+        }
+        Condition n = new Condition(condition.getConditionType());
+        n.setParameterValues(values);
+        return n;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Object parseParameter(Map<String, Object> context, Object 
value) {
+        if (value instanceof String) {
+            if (((String) value).startsWith("parameter::") || ((String) 
value).startsWith("script::")) {
+                String s = (String) value;
+                if (s.startsWith("parameter::")) {
+                    return context.get(StringUtils.substringAfter(s, 
"parameter::"));
+                } else if (s.startsWith("script::")) {
+                    String script = StringUtils.substringAfter(s, "script::");
+                    if (!mvelExpressions.containsKey(script)) {
+                        ParserConfiguration parserConfiguration = new 
ParserConfiguration();
+                        
parserConfiguration.setClassLoader(ConditionContextHelper.class.getClassLoader());
+                        
mvelExpressions.put(script,MVEL.compileExpression(script, new 
ParserContext(parserConfiguration)));
+                    }
+                    return MVEL.executeExpression(mvelExpressions.get(script), 
context);
+                }
+            }
+        } else if (value instanceof Map) {
+            Map<String, Object> values = new HashMap<String, Object>();
+            for (Map.Entry<String, Object> entry : ((Map<String, Object>) 
value).entrySet()) {
+                Object parameter = parseParameter(context, entry.getValue());
+                if (parameter == null) {
+                    return null;
+                }
+                values.put(entry.getKey(), parameter);
+            }
+            return values;
+        } else if (value instanceof List) {
+            List<Object> values = new ArrayList<Object>();
+            for (Object o : ((List<?>) value)) {
+                Object parameter = parseParameter(context, o);
+                if (parameter != null) {
+                    values.add(parameter);
+                }
+            }
+            return values;
+        }
+        return value;
+    }
+
+    private static boolean hasContextualParameter(Object value) {
+        if (value instanceof String) {
+            if (((String) value).startsWith("parameter::") || ((String) 
value).startsWith("script::")) {
+                return true;
+            }
+        } else if (value instanceof Map) {
+            for (Object o : ((Map<?, ?>) value).values()) {
+                if (hasContextualParameter(o)) {
+                    return true;
+                }
+            }
+        } else if (value instanceof List) {
+            for (Object o : ((List<?>) value)) {
+                if (hasContextualParameter(o)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    public static String[] foldToASCII(String[] s) {
+        if (s != null) {
+            for (int i = 0; i < s.length; i++) {
+                s[i] = foldToASCII(s[i]);
+            }
+        }
+        return s;
+    }
+
+    public static String foldToASCII(String s) {
+        if (s != null) {
+            s = s.toLowerCase();
+            int maxSizeNeeded = 4 * s.length();
+            char[] output = new char[ArrayUtil.oversize(maxSizeNeeded, 2)];
+            int length = ASCIIFoldingFilter.foldToASCII(s.toCharArray(), 0, 
output, 0, s.length());
+            return new String(output, 0, length);
+        }
+        return null;
+    }
+
+    public static <T> List<T> foldToASCII(List<T> s) {
+        if (s != null) {
+            return Lists.transform(s, new Function<T, T>() {
+                @Override
+                public T apply(T o) {
+                    if (o instanceof String) {
+                        return (T) ConditionContextHelper.foldToASCII((String) 
o);
+                    }
+                    return o;
+                }
+            });
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
new file mode 100644
index 0000000..cc2dc89
--- /dev/null
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.persistence.elasticsearch.conditions;
+
+import org.apache.unomi.api.conditions.Condition;
+import org.elasticsearch.index.query.FilterBuilder;
+
+import java.util.Map;
+
+public interface ConditionESQueryBuilder {
+
+    FilterBuilder buildFilter(Condition condition, Map<String, Object> 
context, ConditionESQueryBuilderDispatcher dispatcher);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
new file mode 100644
index 0000000..526b49f
--- /dev/null
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.persistence.elasticsearch.conditions;
+
+import org.apache.unomi.api.conditions.Condition;
+import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.FilteredQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ConditionESQueryBuilderDispatcher {
+    private static final Logger logger = 
LoggerFactory.getLogger(ConditionESQueryBuilderDispatcher.class.getName());
+
+    private BundleContext bundleContext;
+    private Map<String, ConditionESQueryBuilder> queryBuilders = new 
ConcurrentHashMap<>();
+    private Map<Long, List<String>> queryBuildersByBundle = new 
ConcurrentHashMap<>();
+
+    public ConditionESQueryBuilderDispatcher() {
+    }
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+
+    public void addQueryBuilder(String name, long bundleId, 
ConditionESQueryBuilder evaluator) {
+        queryBuilders.put(name, evaluator);
+        if (!queryBuildersByBundle.containsKey(bundleId)) {
+            queryBuildersByBundle.put(bundleId, new ArrayList<String>());
+        }
+        queryBuildersByBundle.get(bundleId).add(name);
+    }
+
+    public void removeQueryBuilders(long bundleId) {
+        if (queryBuildersByBundle.containsKey(bundleId)) {
+            for (String s : queryBuildersByBundle.get(bundleId)) {
+                queryBuilders.remove(s);
+            }
+            queryBuildersByBundle.remove(bundleId);
+        }
+    }
+
+    public String getQuery(Condition condition) {
+        return "{\"query\": " + getQueryBuilder(condition).toString() + "}";
+    }
+
+    public FilteredQueryBuilder getQueryBuilder(Condition condition) {
+        return QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), 
buildFilter(condition));
+    }
+
+    public FilterBuilder buildFilter(Condition condition) {
+        return buildFilter(condition, new HashMap<String, Object>());
+    }
+
+    public FilterBuilder buildFilter(Condition condition, Map<String, Object> 
context) {
+        if(condition == null || condition.getConditionType() == null) {
+            throw new IllegalArgumentException("Condition is null or doesn't 
have type, impossible to build filter");
+        }
+
+        String queryBuilderKey = 
condition.getConditionType().getQueryBuilder();
+        if (queryBuilderKey == null && 
condition.getConditionType().getParentCondition() != null) {
+            context.putAll(condition.getParameterValues());
+            return 
buildFilter(condition.getConditionType().getParentCondition(), context);
+        }
+
+        if (queryBuilderKey == null) {
+            throw new UnsupportedOperationException("No query builder defined 
for : " + condition.getConditionTypeId());
+        }
+
+        if (queryBuilders.containsKey(queryBuilderKey)) {
+            ConditionESQueryBuilder queryBuilder = 
queryBuilders.get(queryBuilderKey);
+            Condition contextualCondition = 
ConditionContextHelper.getContextualCondition(condition, context);
+            if (contextualCondition != null) {
+                return queryBuilder.buildFilter(contextualCondition, context, 
this);
+            }
+        }
+
+        // if no matching
+        return FilterBuilders.matchAllFilter();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluator.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluator.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluator.java
new file mode 100644
index 0000000..86f8f52
--- /dev/null
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.persistence.elasticsearch.conditions;
+
+import org.apache.unomi.api.Item;
+import org.apache.unomi.api.conditions.Condition;
+
+import java.util.Map;
+
+/**
+ * Condition evaluator interface
+ */
+public interface ConditionEvaluator {
+
+    boolean eval(Condition condition, Item item, Map<String, Object> context, 
ConditionEvaluatorDispatcher dispatcher);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/dc1d1520/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java
new file mode 100644
index 0000000..0341c46
--- /dev/null
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.persistence.elasticsearch.conditions;
+
+import org.apache.unomi.api.Item;
+import org.apache.unomi.api.conditions.Condition;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Entry point for condition evaluation. Will dispatch to all evaluators.
+ */
+public class ConditionEvaluatorDispatcher {
+    private static final Logger logger = 
LoggerFactory.getLogger(ConditionEvaluatorDispatcher.class.getName());
+
+    private BundleContext bundleContext;
+    private Map<String, ConditionEvaluator> evaluators = new 
ConcurrentHashMap<>();
+    private Map<Long, List<String>> evaluatorsByBundle = new 
ConcurrentHashMap<>();
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+
+    public void addEvaluator(String name, long bundleId, ConditionEvaluator 
evaluator) {
+        evaluators.put(name, evaluator);
+        if (!evaluatorsByBundle.containsKey(bundleId)) {
+            evaluatorsByBundle.put(bundleId, new ArrayList<String>());
+        }
+        evaluatorsByBundle.get(bundleId).add(name);
+    }
+
+    public void removeEvaluators(long bundleId) {
+        if (evaluatorsByBundle.containsKey(bundleId)) {
+            for (String s : evaluatorsByBundle.get(bundleId)) {
+                evaluators.remove(s);
+            }
+            evaluatorsByBundle.remove(bundleId);
+        }
+    }
+
+    public boolean eval(Condition condition, Item item) {
+        return eval(condition, item, new HashMap<String, Object>());
+    }
+
+    public boolean eval(Condition condition, Item item, Map<String, Object> 
context) {
+        String conditionEvaluatorKey = 
condition.getConditionType().getConditionEvaluator();
+        if (condition.getConditionType().getParentCondition() != null) {
+            context.putAll(condition.getParameterValues());
+            return eval(condition.getConditionType().getParentCondition(), 
item, context);
+        }
+
+        if (conditionEvaluatorKey == null) {
+            throw new UnsupportedOperationException("No evaluator defined for 
: " + condition.getConditionTypeId());
+        }
+
+        if (evaluators.containsKey(conditionEvaluatorKey)) {
+            ConditionEvaluator evaluator = 
evaluators.get(conditionEvaluatorKey);
+            Condition contextualCondition = 
ConditionContextHelper.getContextualCondition(condition, context);
+            if (contextualCondition != null) {
+                return evaluator.eval(contextualCondition, item, context, 
this);
+            } else {
+                return true;
+            }
+        }
+
+        // if no matching
+        return false;
+    }
+}

Reply via email to