Repository: incubator-unomi
Updated Branches:
  refs/heads/master a4955cafa -> b1c88c4c4


UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates
- Validate configuration functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/a7fb48dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/a7fb48dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/a7fb48dd

Branch: refs/heads/master
Commit: a7fb48dd948d36e376a4fdfc8868fd51a9053bd0
Parents: 79b1b9c
Author: Serge Huber <[email protected]>
Authored: Tue Nov 15 14:13:46 2016 +0100
Committer: Serge Huber <[email protected]>
Committed: Tue Nov 15 14:13:46 2016 +0100

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 85 +++++++++++++-------
 .../resources/OSGI-INF/blueprint/blueprint.xml  | 13 +++
 ...g.apache.unomi.persistence.elasticsearch.cfg |  6 ++
 3 files changed, 77 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a7fb48dd/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index d13f7b6..2208951 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -117,12 +117,6 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
     public static final String CONTEXTSERVER_PORT = "contextserver.port";
     public static final String CONTEXTSERVER_SECURE_ADDRESS = 
"contextserver.secureAddress";
     public static final String CONTEXTSERVER_SECURE_PORT = 
"contextserver.securePort";
-    public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME 
= "contextserver.elasticsearch.bulkprocessor.name";
-    public static final String 
CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST = 
"contextserver.elasticsearch.bulkprocessor.concurrentRequests";
-    public static final String 
CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS = 
"contextserver.elasticsearch.bulkprocessor.bulkActions";
-    public static final String 
CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE = 
"contextserver.elasticsearch.bulkprocessor.bulkSize";
-    public static final String 
CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL = 
"contextserver.elasticsearch.bulkprocessor.flushInterval";
-    public static final String 
CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY = 
"contextserver.elasticsearch.bulkprocessor.backoffPolicy";
     public static final String KARAF_HOME = "karaf.home";
     public static final String ELASTICSEARCH_HOME_DIRECTORY = "elasticsearch";
     public static final String ELASTICSEARCH_PLUGINS_DIRECTORY = 
ELASTICSEARCH_HOME_DIRECTORY + "/plugins";
@@ -142,6 +136,12 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
     public static final String PATH_PLUGINS = "path.plugins";
     public static final String INDEX_MAX_RESULT_WINDOW = 
"index.max_result_window";
     public static final String MAPPER_ALLOW_DOTS_IN_NAME = 
"mapper.allow_dots_in_name";
+    public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name";
+    public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = 
"bulkProcessor.concurrentRequests";
+    public static final String BULK_PROCESSOR_BULK_ACTIONS = 
"bulkProcessor.bulkActions";
+    public static final String BULK_PROCESSOR_BULK_SIZE = 
"bulkProcessor.bulkSize";
+    public static final String BULK_PROCESSOR_FLUSH_INTERVAL = 
"bulkProcessor.flushInterval";
+    public static final String BULK_PROCESSOR_BACKOFF_POLICY = 
"bulkProcessor.backoffPolicy";
 
     private Node node;
     private Client client;
@@ -172,6 +172,13 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
 
     private Timer timer;
 
+    private String bulkProcessorName = "unomi-bulk";
+    private String bulkProcessorConcurrentRequests = "1";
+    private String bulkProcessorBulkActions = "1000";
+    private String bulkProcessorBulkSize= "5MB";
+    private String bulkProcessorFlushInterval = "5s";
+    private String bulkProcessorBackoffPolicy = "exponential";
+
     public void setBundleContext(BundleContext bundleContext) {
         this.bundleContext = bundleContext;
     }
@@ -252,6 +259,30 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
         this.conditionESQueryBuilderDispatcher = 
conditionESQueryBuilderDispatcher;
     }
 
+    public void setBulkProcessorName(String bulkProcessorName) {
+        this.bulkProcessorName = bulkProcessorName;
+    }
+
+    public void setBulkProcessorConcurrentRequests(String 
bulkProcessorConcurrentRequests) {
+        this.bulkProcessorConcurrentRequests = bulkProcessorConcurrentRequests;
+    }
+
+    public void setBulkProcessorBulkActions(String bulkProcessorBulkActions) {
+        this.bulkProcessorBulkActions = bulkProcessorBulkActions;
+    }
+
+    public void setBulkProcessorBulkSize(String bulkProcessorBulkSize) {
+        this.bulkProcessorBulkSize = bulkProcessorBulkSize;
+    }
+
+    public void setBulkProcessorFlushInterval(String 
bulkProcessorFlushInterval) {
+        this.bulkProcessorFlushInterval = bulkProcessorFlushInterval;
+    }
+
+    public void setBulkProcessorBackoffPolicy(String 
bulkProcessorBackoffPolicy) {
+        this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy;
+    }
+
     public void start() {
 
         loadPredefinedMappings(bundleContext, false);
@@ -282,6 +313,13 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                 secureAddress = 
System.getProperty(CONTEXTSERVER_SECURE_ADDRESS, secureAddress);
                 securePort = System.getProperty(CONTEXTSERVER_SECURE_PORT, 
securePort);
 
+                bulkProcessorName = System.getProperty(BULK_PROCESSOR_NAME, 
bulkProcessorName);
+                bulkProcessorConcurrentRequests = 
System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, 
bulkProcessorConcurrentRequests);
+                bulkProcessorBulkActions = 
System.getProperty(BULK_PROCESSOR_BULK_ACTIONS, bulkProcessorBulkActions);
+                bulkProcessorBulkSize = 
System.getProperty(BULK_PROCESSOR_BULK_SIZE, bulkProcessorBulkSize);
+                bulkProcessorFlushInterval = 
System.getProperty(BULK_PROCESSOR_FLUSH_INTERVAL, bulkProcessorFlushInterval);
+                bulkProcessorBackoffPolicy = 
System.getProperty(BULK_PROCESSOR_BACKOFF_POLICY, bulkProcessorBackoffPolicy);
+
                 Settings.Builder settingsBuilder = Settings.builder();
                 if (settings != null) {
                     settingsBuilder.put(settings);
@@ -430,37 +468,30 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                         // we could add index creation here in the case of 
index seperation by dates.
                     }
                 });
-        if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME) 
!= null) {
-            String name = 
System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME);
-            if (name != null && name.length() > 0) {
-                bulkProcessorBuilder.setName(name);
-            }
+        if (bulkProcessorName != null && bulkProcessorName.length() > 0) {
+            bulkProcessorBuilder.setName(bulkProcessorName);
         }
-        if 
(System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST)
 != null) {
-            String concurrentRequestsStr = 
System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST);
-            int concurrentRequests = Integer.parseInt(concurrentRequestsStr);
+        if (bulkProcessorConcurrentRequests != null) {
+            int concurrentRequests = 
Integer.parseInt(bulkProcessorConcurrentRequests);
             if (concurrentRequests > 1) {
                 bulkProcessorBuilder.setConcurrentRequests(concurrentRequests);
             }
         }
-        if 
(System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS) != 
null) {
-            String bulkActionsStr = 
System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS);
-            int bulkActions = Integer.parseInt(bulkActionsStr);
+        if (bulkProcessorBulkActions != null) {
+            int bulkActions = Integer.parseInt(bulkProcessorBulkActions);
             bulkProcessorBuilder.setBulkActions(bulkActions);
         }
-        if 
(System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE) != 
null) {
-            String bulkSizeStr = 
System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE);
-            
bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkSizeStr, 
new ByteSizeValue(5, ByteSizeUnit.MB), 
CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE));
+        if (bulkProcessorBulkSize != null) {
+            
bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkProcessorBulkSize,
 new ByteSizeValue(5, ByteSizeUnit.MB), BULK_PROCESSOR_BULK_SIZE));
         }
-        if 
(System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL) != 
null) {
-            String flushIntervalStr = 
System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL);
-            
bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(flushIntervalStr,
 null, CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL));
+        if (bulkProcessorFlushInterval != null) {
+            
bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(bulkProcessorFlushInterval,
 null, BULK_PROCESSOR_FLUSH_INTERVAL));
         } else {
             // in ElasticSearch this defaults to null, but we would like to 
set a value to 5 seconds by default
             bulkProcessorBuilder.setFlushInterval(new TimeValue(5, 
TimeUnit.SECONDS));
         }
-        if 
(System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY) != 
null) {
-            String backoffPolicyStr = 
System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY);
+        if (bulkProcessorBackoffPolicy != null) {
+            String backoffPolicyStr = bulkProcessorBackoffPolicy;
             if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) {
                 backoffPolicyStr = backoffPolicyStr.toLowerCase();
                 if ("nobackoff".equals(backoffPolicyStr)) {
@@ -469,7 +500,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
                     int paramStartPos = backoffPolicyStr.indexOf("constant(" + 
"constant(".length());
                     int paramEndPos = backoffPolicyStr.indexOf(")", 
paramStartPos);
                     int paramSeparatorPos = backoffPolicyStr.indexOf(",", 
paramStartPos);
-                    TimeValue delay = 
TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, 
paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), 
CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY );
+                    TimeValue delay = 
TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, 
paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), 
BULK_PROCESSOR_BACKOFF_POLICY);
                     int maxNumberOfRetries = 
Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
                     
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, 
maxNumberOfRetries));
                 } else if (backoffPolicyStr.startsWith("exponential")) {
@@ -480,7 +511,7 @@ public class ElasticSearchPersistenceServiceImpl implements 
PersistenceService,
                         int paramStartPos = 
backoffPolicyStr.indexOf("exponential(" + "exponential(".length());
                         int paramEndPos = backoffPolicyStr.indexOf(")", 
paramStartPos);
                         int paramSeparatorPos = backoffPolicyStr.indexOf(",", 
paramStartPos);
-                        TimeValue delay = 
TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, 
paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), 
CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY );
+                        TimeValue delay = 
TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, 
paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), 
BULK_PROCESSOR_BACKOFF_POLICY);
                         int maxNumberOfRetries = 
Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
                         
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, 
maxNumberOfRetries));
                     }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a7fb48dd/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5fbc9dd..75039d8 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -40,6 +40,13 @@
             <cm:property name="discovery.zen.ping.multicast.enabled" 
value="false"/>
             <cm:property name="node.data" value="true"/>
             <cm:property name="defaultQueryLimit" value="10"/>
+
+            <cm:property name="bulkProcessor.name" value="unomi-bulk" />
+            <cm:property name="bulkProcessor.concurrentRequests" value="1" />
+            <cm:property name="bulkProcessor.bulkActions" value="1000" />
+            <cm:property name="bulkProcessor.bulkSize" value="5MB" />
+            <cm:property name="bulkProcessor.flushInterval" value="5s" />
+            <cm:property name="bulkProcessor.backoffPolicy" 
value="exponential" />
         </cm:default-properties>
     </cm:property-placeholder>
 
@@ -106,6 +113,12 @@
                 <entry key="geonameEntry" value="geonames"/>
             </map>
         </property>
+        <property name="bulkProcessorName" value="${es.bulkProcessor.name}" />
+        <property name="bulkProcessorConcurrentRequests" 
value="${es.bulkProcessor.concurrentRequests}" />
+        <property name="bulkProcessorBulkActions" 
value="${es.bulkProcessor.bulkActions}" />
+        <property name="bulkProcessorBulkSize" 
value="${es.bulkProcessor.bulkSize}" />
+        <property name="bulkProcessorFlushInterval" 
value="${es.bulkProcessor.flushInterval}" />
+        <property name="bulkProcessorBackoffPolicy" 
value="${es.bulkProcessor.backoffPolicy}" />
     </bean>
 
 </blueprint>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a7fb48dd/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index fb058a4..c5d906f 100644
--- 
a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ 
b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -26,3 +26,9 @@ node.data=true
 discovery.zen.ping.multicast.enabled=false
 #discovery.zen.ping.unicast.hosts=["192.168.0.1:9300", "192.168.0.2:9300"]
 defaultQueryLimit=10
+bulkProcessor.name=unomi-bulk
+bulkProcessor.concurrentRequests=1
+bulkProcessor.bulkActions=1000
+bulkProcessor.bulkSize=10MB
+bulkProcessor.flushInterval=5s
+bulkProcessor.backoffPolicy=exponential
\ No newline at end of file

Reply via email to