- Fix waiting for green status message - Add code to register mappings and condition builders that were started before the ElasticSearch persistence service implementation - Change counting code to use queries instead of aggregations
Signed-off-by: Serge Huber <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/35b93979 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/35b93979 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/35b93979 Branch: refs/heads/feature-UNOMI-70-ES5X Commit: 35b9397931cd89dd6d021e65079c658d6fdf29e0 Parents: 952da61 Author: Serge Huber <[email protected]> Authored: Wed Dec 21 21:29:13 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Wed Dec 21 21:29:13 2016 +0100 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 39 ++++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/35b93979/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 6c1d8e0..dc909b9 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -24,7 +24,10 @@ import org.apache.karaf.cellar.core.*; import org.apache.karaf.cellar.core.control.SwitchStatus; import org.apache.karaf.cellar.core.event.EventProducer; import org.apache.karaf.cellar.core.event.EventType; -import org.apache.unomi.api.*; +import org.apache.unomi.api.ClusterNode; +import org.apache.unomi.api.Item; +import org.apache.unomi.api.PartialList; +import org.apache.unomi.api.TimestampedItem; import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.query.DateRange; import org.apache.unomi.api.query.IpRange; @@ -79,10 +82,7 @@ import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.transport.client.PreBuiltTransportClient; -import org.osgi.framework.BundleContext; -import org.osgi.framework.BundleEvent; -import org.osgi.framework.ServiceReference; -import org.osgi.framework.SynchronousBundleListener; +import org.osgi.framework.*; import org.osgi.service.cm.ConfigurationAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -415,7 +415,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, refreshExistingIndexNames(); - logger.info("Waiting for index creation to complete..."); + logger.info("Waiting for GREEN cluster status..."); client.admin().cluster().prepareHealth() .setWaitForGreenStatus() @@ -458,6 +458,24 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } }, 10000L, 24L * 60L * 60L * 1000L); + // load predefined mappings and condition dispatchers of any bundles that were started before this one. + for (Bundle existingBundle : bundleContext.getBundles()) { + if (existingBundle.getBundleContext() != null) { + loadPredefinedMappings(existingBundle.getBundleContext(), true); + } + if (existingBundle.getRegisteredServices() != null) { + for (ServiceReference<?> reference : existingBundle.getRegisteredServices()) { + Object service = bundleContext.getService(reference); + if (service instanceof ConditionEvaluator) { + conditionEvaluatorDispatcher.addEvaluator(reference.getProperty("conditionEvaluatorId").toString(), existingBundle.getBundleId(), (ConditionEvaluator) service); + } + if (service instanceof ConditionESQueryBuilder) { + conditionESQueryBuilderDispatcher.addQueryBuilder(reference.getProperty("queryBuilderId").toString(), existingBundle.getBundleId(), (ConditionESQueryBuilder) service); + } + } + } + } + logger.info(this.getClass().getName() + " service started successfully."); } @@ -649,7 +667,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } while (predefinedMappings.hasMoreElements()) { URL predefinedMappingURL = predefinedMappings.nextElement(); - logger.debug("Found mapping at " + predefinedMappingURL + ", loading... "); + logger.info("Found mapping at " + predefinedMappingURL + ", loading... "); try { final String path = predefinedMappingURL.getPath(); String name = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.')); @@ -1218,13 +1236,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SearchResponse response = client.prepareSearch(getIndexNameForQuery(itemType)) .setTypes(itemType) .setSize(0) - .setQuery(QueryBuilders.matchAllQuery()) - .addAggregation(AggregationBuilders.filter("filter", filter)) + .setQuery(filter) .execute() .actionGet(); - Aggregations searchHits = response.getAggregations(); - Filter filter = searchHits.get("filter"); - return filter.getDocCount(); + return response.getHits().getTotalHits(); } }.executeInClassLoader(); }
