- Fix waiting for green status message
- Add code to register mappings and condition builders that were started before 
the ElasticSearch persistence service implementation
- Change counting code to use queries instead of aggregations

Signed-off-by: Serge Huber <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/35b93979
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/35b93979
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/35b93979

Branch: refs/heads/feature-UNOMI-70-ES5X
Commit: 35b9397931cd89dd6d021e65079c658d6fdf29e0
Parents: 952da61
Author: Serge Huber <[email protected]>
Authored: Wed Dec 21 21:29:13 2016 +0100
Committer: Serge Huber <[email protected]>
Committed: Wed Dec 21 21:29:13 2016 +0100

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 39 ++++++++++++++------
 1 file changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/35b93979/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 6c1d8e0..dc909b9 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -24,7 +24,10 @@ import org.apache.karaf.cellar.core.*;
 import org.apache.karaf.cellar.core.control.SwitchStatus;
 import org.apache.karaf.cellar.core.event.EventProducer;
 import org.apache.karaf.cellar.core.event.EventType;
-import org.apache.unomi.api.*;
+import org.apache.unomi.api.ClusterNode;
+import org.apache.unomi.api.Item;
+import org.apache.unomi.api.PartialList;
+import org.apache.unomi.api.TimestampedItem;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.query.DateRange;
 import org.apache.unomi.api.query.IpRange;
@@ -79,10 +82,7 @@ import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleEvent;
-import org.osgi.framework.ServiceReference;
-import org.osgi.framework.SynchronousBundleListener;
+import org.osgi.framework.*;
 import org.osgi.service.cm.ConfigurationAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -415,7 +415,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
 
                 refreshExistingIndexNames();
 
-                logger.info("Waiting for index creation to complete...");
+                logger.info("Waiting for GREEN cluster status...");
 
                 client.admin().cluster().prepareHealth()
                         .setWaitForGreenStatus()
@@ -458,6 +458,24 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
             }
         }, 10000L, 24L * 60L * 60L * 1000L);
 
+        // load predefined mappings and condition dispatchers of any bundles 
that were started before this one.
+        for (Bundle existingBundle : bundleContext.getBundles()) {
+            if (existingBundle.getBundleContext() != null) {
+                loadPredefinedMappings(existingBundle.getBundleContext(), 
true);
+            }
+            if (existingBundle.getRegisteredServices() != null) {
+                for (ServiceReference<?> reference : 
existingBundle.getRegisteredServices()) {
+                    Object service = bundleContext.getService(reference);
+                    if (service instanceof ConditionEvaluator) {
+                        
conditionEvaluatorDispatcher.addEvaluator(reference.getProperty("conditionEvaluatorId").toString(),
 existingBundle.getBundleId(), (ConditionEvaluator) service);
+                    }
+                    if (service instanceof ConditionESQueryBuilder) {
+                        
conditionESQueryBuilderDispatcher.addQueryBuilder(reference.getProperty("queryBuilderId").toString(),
 existingBundle.getBundleId(), (ConditionESQueryBuilder) service);
+                    }
+                }
+            }
+        }
+
         logger.info(this.getClass().getName() + " service started 
successfully.");
     }
 
@@ -649,7 +667,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
         }
         while (predefinedMappings.hasMoreElements()) {
             URL predefinedMappingURL = predefinedMappings.nextElement();
-            logger.debug("Found mapping at " + predefinedMappingURL + ", 
loading... ");
+            logger.info("Found mapping at " + predefinedMappingURL + ", 
loading... ");
             try {
                 final String path = predefinedMappingURL.getPath();
                 String name = path.substring(path.lastIndexOf('/') + 1, 
path.lastIndexOf('.'));
@@ -1218,13 +1236,10 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                 SearchResponse response = 
client.prepareSearch(getIndexNameForQuery(itemType))
                         .setTypes(itemType)
                         .setSize(0)
-                        .setQuery(QueryBuilders.matchAllQuery())
-                        .addAggregation(AggregationBuilders.filter("filter", 
filter))
+                        .setQuery(filter)
                         .execute()
                         .actionGet();
-                Aggregations searchHits = response.getAggregations();
-                Filter filter = searchHits.get("filter");
-                return filter.getDocCount();
+                return response.getHits().getTotalHits();
             }
         }.executeInClassLoader();
     }

Reply via email to