Repository: incubator-unomi Updated Branches: refs/heads/master a4955cafa -> b1c88c4c4
UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates - Validate configuration functionality Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/a7fb48dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/a7fb48dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/a7fb48dd Branch: refs/heads/master Commit: a7fb48dd948d36e376a4fdfc8868fd51a9053bd0 Parents: 79b1b9c Author: Serge Huber <[email protected]> Authored: Tue Nov 15 14:13:46 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Tue Nov 15 14:13:46 2016 +0100 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 85 +++++++++++++------- .../resources/OSGI-INF/blueprint/blueprint.xml | 13 +++ ...g.apache.unomi.persistence.elasticsearch.cfg | 6 ++ 3 files changed, 77 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a7fb48dd/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index d13f7b6..2208951 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -117,12 +117,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public static final String CONTEXTSERVER_PORT = "contextserver.port"; public static final String CONTEXTSERVER_SECURE_ADDRESS = "contextserver.secureAddress"; public static final String CONTEXTSERVER_SECURE_PORT = "contextserver.securePort"; - public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME = "contextserver.elasticsearch.bulkprocessor.name"; - public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST = "contextserver.elasticsearch.bulkprocessor.concurrentRequests"; - public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS = "contextserver.elasticsearch.bulkprocessor.bulkActions"; - public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE = "contextserver.elasticsearch.bulkprocessor.bulkSize"; - public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL = "contextserver.elasticsearch.bulkprocessor.flushInterval"; - public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY = "contextserver.elasticsearch.bulkprocessor.backoffPolicy"; public static final String KARAF_HOME = "karaf.home"; public static final String ELASTICSEARCH_HOME_DIRECTORY = "elasticsearch"; public static final String ELASTICSEARCH_PLUGINS_DIRECTORY = ELASTICSEARCH_HOME_DIRECTORY + "/plugins"; @@ -142,6 +136,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public static final String PATH_PLUGINS = "path.plugins"; public static final String INDEX_MAX_RESULT_WINDOW = "index.max_result_window"; public static final String MAPPER_ALLOW_DOTS_IN_NAME = "mapper.allow_dots_in_name"; + public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name"; + public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests"; + public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions"; + public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize"; + public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval"; + public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy"; private Node node; private Client client; @@ -172,6 +172,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Timer timer; + private String bulkProcessorName = "unomi-bulk"; + private String bulkProcessorConcurrentRequests = "1"; + private String bulkProcessorBulkActions = "1000"; + private String bulkProcessorBulkSize= "5MB"; + private String bulkProcessorFlushInterval = "5s"; + private String bulkProcessorBackoffPolicy = "exponential"; + public void setBundleContext(BundleContext bundleContext) { this.bundleContext = bundleContext; } @@ -252,6 +259,30 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.conditionESQueryBuilderDispatcher = conditionESQueryBuilderDispatcher; } + public void setBulkProcessorName(String bulkProcessorName) { + this.bulkProcessorName = bulkProcessorName; + } + + public void setBulkProcessorConcurrentRequests(String bulkProcessorConcurrentRequests) { + this.bulkProcessorConcurrentRequests = bulkProcessorConcurrentRequests; + } + + public void setBulkProcessorBulkActions(String bulkProcessorBulkActions) { + this.bulkProcessorBulkActions = bulkProcessorBulkActions; + } + + public void setBulkProcessorBulkSize(String bulkProcessorBulkSize) { + this.bulkProcessorBulkSize = bulkProcessorBulkSize; + } + + public void setBulkProcessorFlushInterval(String bulkProcessorFlushInterval) { + this.bulkProcessorFlushInterval = bulkProcessorFlushInterval; + } + + public void setBulkProcessorBackoffPolicy(String bulkProcessorBackoffPolicy) { + this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy; + } + public void start() { loadPredefinedMappings(bundleContext, false); @@ -282,6 +313,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, secureAddress = System.getProperty(CONTEXTSERVER_SECURE_ADDRESS, secureAddress); securePort = System.getProperty(CONTEXTSERVER_SECURE_PORT, securePort); + bulkProcessorName = System.getProperty(BULK_PROCESSOR_NAME, bulkProcessorName); + bulkProcessorConcurrentRequests = System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, bulkProcessorConcurrentRequests); + bulkProcessorBulkActions = System.getProperty(BULK_PROCESSOR_BULK_ACTIONS, bulkProcessorBulkActions); + bulkProcessorBulkSize = System.getProperty(BULK_PROCESSOR_BULK_SIZE, bulkProcessorBulkSize); + bulkProcessorFlushInterval = System.getProperty(BULK_PROCESSOR_FLUSH_INTERVAL, bulkProcessorFlushInterval); + bulkProcessorBackoffPolicy = System.getProperty(BULK_PROCESSOR_BACKOFF_POLICY, bulkProcessorBackoffPolicy); + Settings.Builder settingsBuilder = Settings.builder(); if (settings != null) { settingsBuilder.put(settings); @@ -430,37 +468,30 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, // we could add index creation here in the case of index seperation by dates. } }); - if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME) != null) { - String name = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME); - if (name != null && name.length() > 0) { - bulkProcessorBuilder.setName(name); - } + if (bulkProcessorName != null && bulkProcessorName.length() > 0) { + bulkProcessorBuilder.setName(bulkProcessorName); } - if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST) != null) { - String concurrentRequestsStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST); - int concurrentRequests = Integer.parseInt(concurrentRequestsStr); + if (bulkProcessorConcurrentRequests != null) { + int concurrentRequests = Integer.parseInt(bulkProcessorConcurrentRequests); if (concurrentRequests > 1) { bulkProcessorBuilder.setConcurrentRequests(concurrentRequests); } } - if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS) != null) { - String bulkActionsStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS); - int bulkActions = Integer.parseInt(bulkActionsStr); + if (bulkProcessorBulkActions != null) { + int bulkActions = Integer.parseInt(bulkProcessorBulkActions); bulkProcessorBuilder.setBulkActions(bulkActions); } - if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE) != null) { - String bulkSizeStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE); - bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkSizeStr, new ByteSizeValue(5, ByteSizeUnit.MB), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE)); + if (bulkProcessorBulkSize != null) { + bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkProcessorBulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), BULK_PROCESSOR_BULK_SIZE)); } - if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL) != null) { - String flushIntervalStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL); - bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(flushIntervalStr, null, CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL)); + if (bulkProcessorFlushInterval != null) { + bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(bulkProcessorFlushInterval, null, BULK_PROCESSOR_FLUSH_INTERVAL)); } else { // in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default bulkProcessorBuilder.setFlushInterval(new TimeValue(5, TimeUnit.SECONDS)); } - if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY) != null) { - String backoffPolicyStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY); + if (bulkProcessorBackoffPolicy != null) { + String backoffPolicyStr = bulkProcessorBackoffPolicy; if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) { backoffPolicyStr = backoffPolicyStr.toLowerCase(); if ("nobackoff".equals(backoffPolicyStr)) { @@ -469,7 +500,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, int paramStartPos = backoffPolicyStr.indexOf("constant(" + "constant(".length()); int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos); int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos); - TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY ); + TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY); int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos)); bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries)); } else if (backoffPolicyStr.startsWith("exponential")) { @@ -480,7 +511,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, int paramStartPos = backoffPolicyStr.indexOf("exponential(" + "exponential(".length()); int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos); int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos); - TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY ); + TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY); int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos)); bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries)); } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a7fb48dd/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 5fbc9dd..75039d8 100644 --- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -40,6 +40,13 @@ <cm:property name="discovery.zen.ping.multicast.enabled" value="false"/> <cm:property name="node.data" value="true"/> <cm:property name="defaultQueryLimit" value="10"/> + + <cm:property name="bulkProcessor.name" value="unomi-bulk" /> + <cm:property name="bulkProcessor.concurrentRequests" value="1" /> + <cm:property name="bulkProcessor.bulkActions" value="1000" /> + <cm:property name="bulkProcessor.bulkSize" value="5MB" /> + <cm:property name="bulkProcessor.flushInterval" value="5s" /> + <cm:property name="bulkProcessor.backoffPolicy" value="exponential" /> </cm:default-properties> </cm:property-placeholder> @@ -106,6 +113,12 @@ <entry key="geonameEntry" value="geonames"/> </map> </property> + <property name="bulkProcessorName" value="${es.bulkProcessor.name}" /> + <property name="bulkProcessorConcurrentRequests" value="${es.bulkProcessor.concurrentRequests}" /> + <property name="bulkProcessorBulkActions" value="${es.bulkProcessor.bulkActions}" /> + <property name="bulkProcessorBulkSize" value="${es.bulkProcessor.bulkSize}" /> + <property name="bulkProcessorFlushInterval" value="${es.bulkProcessor.flushInterval}" /> + <property name="bulkProcessorBackoffPolicy" value="${es.bulkProcessor.backoffPolicy}" /> </bean> </blueprint> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a7fb48dd/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index fb058a4..c5d906f 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -26,3 +26,9 @@ node.data=true discovery.zen.ping.multicast.enabled=false #discovery.zen.ping.unicast.hosts=["192.168.0.1:9300", "192.168.0.2:9300"] defaultQueryLimit=10 +bulkProcessor.name=unomi-bulk +bulkProcessor.concurrentRequests=1 +bulkProcessor.bulkActions=1000 +bulkProcessor.bulkSize=10MB +bulkProcessor.flushInterval=5s +bulkProcessor.backoffPolicy=exponential \ No newline at end of file
