UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates - Fix merge issues with master - Implement bulk processor support for ElasticSearch update operations (indexing operations not yet integrated because of IndexNotFoundException handling) - Minor logging improvements in SegmentService
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/285d4cc6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/285d4cc6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/285d4cc6 Branch: refs/heads/UNOMI-28-ES-2-X-UPGRADE Commit: 285d4cc6dbcce938f094188fa3809f93fb9dc52b Parents: f0bd45f Author: Serge Huber <[email protected]> Authored: Mon Nov 14 17:25:05 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Mon Nov 14 17:25:05 2016 +0100 ---------------------------------------------------------------------- package/pom.xml | 5 +- .../ElasticSearchPersistenceServiceImpl.java | 133 +++++++++++++++++-- .../services/services/SegmentServiceImpl.java | 8 +- 3 files changed, 130 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/package/pom.xml ---------------------------------------------------------------------- diff --git a/package/pom.xml b/package/pom.xml index 5de850b..d83f4c3 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -123,9 +123,6 @@ <profiles> <profile> <id>src</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> <build> <plugins> <plugin> @@ -308,7 +305,7 @@ </plugin> </plugins> </build> + </profile> </profiles> - </project> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 16f1bc6..d13f7b6 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -40,8 +40,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.deletebyquery.DeleteByQueryAction; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; @@ -51,12 +50,15 @@ import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; @@ -101,6 +103,7 @@ import java.nio.file.Paths; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; @@ -114,6 +117,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public static final String CONTEXTSERVER_PORT = "contextserver.port"; public static final String CONTEXTSERVER_SECURE_ADDRESS = "contextserver.secureAddress"; public static final String CONTEXTSERVER_SECURE_PORT = "contextserver.securePort"; + public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME = "contextserver.elasticsearch.bulkprocessor.name"; + public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST = "contextserver.elasticsearch.bulkprocessor.concurrentRequests"; + public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS = "contextserver.elasticsearch.bulkprocessor.bulkActions"; + public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE = "contextserver.elasticsearch.bulkprocessor.bulkSize"; + public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL = "contextserver.elasticsearch.bulkprocessor.flushInterval"; + public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY = "contextserver.elasticsearch.bulkprocessor.backoffPolicy"; public static final String KARAF_HOME = "karaf.home"; public static final String ELASTICSEARCH_HOME_DIRECTORY = "elasticsearch"; public static final String ELASTICSEARCH_PLUGINS_DIRECTORY = ELASTICSEARCH_HOME_DIRECTORY + "/plugins"; @@ -136,6 +145,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private Node node; private Client client; + private BulkProcessor bulkProcessor; private String clusterName; private String indexName; private String monthlyIndexNumberOfShards; @@ -351,6 +361,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, getMonthlyIndex(new Date(), true); + if (client != null && bulkProcessor == null) { + bulkProcessor = getBulkProcessor(); + } + return null; } }.executeInClassLoader(); @@ -388,11 +402,104 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.info(this.getClass().getName() + " service started successfully."); } + public BulkProcessor getBulkProcessor() { + if (bulkProcessor != null) { + return bulkProcessor; + } + BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( + client, + new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, + BulkRequest request) { + logger.debug("Before Bulk"); + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + BulkResponse response) { + logger.debug("After Bulk"); + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + Throwable failure) { + logger.error("After Bulk (failure)", failure); + // we could add index creation here in the case of index seperation by dates. + } + }); + if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME) != null) { + String name = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME); + if (name != null && name.length() > 0) { + bulkProcessorBuilder.setName(name); + } + } + if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST) != null) { + String concurrentRequestsStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST); + int concurrentRequests = Integer.parseInt(concurrentRequestsStr); + if (concurrentRequests > 1) { + bulkProcessorBuilder.setConcurrentRequests(concurrentRequests); + } + } + if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS) != null) { + String bulkActionsStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS); + int bulkActions = Integer.parseInt(bulkActionsStr); + bulkProcessorBuilder.setBulkActions(bulkActions); + } + if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE) != null) { + String bulkSizeStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE); + bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkSizeStr, new ByteSizeValue(5, ByteSizeUnit.MB), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE)); + } + if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL) != null) { + String flushIntervalStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL); + bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(flushIntervalStr, null, CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL)); + } else { + // in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default + bulkProcessorBuilder.setFlushInterval(new TimeValue(5, TimeUnit.SECONDS)); + } + if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY) != null) { + String backoffPolicyStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY); + if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) { + backoffPolicyStr = backoffPolicyStr.toLowerCase(); + if ("nobackoff".equals(backoffPolicyStr)) { + bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.noBackoff()); + } else if (backoffPolicyStr.startsWith("constant(")) { + int paramStartPos = backoffPolicyStr.indexOf("constant(" + "constant(".length()); + int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos); + int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos); + TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY ); + int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos)); + bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries)); + } else if (backoffPolicyStr.startsWith("exponential")) { + if (!backoffPolicyStr.contains("(")) { + bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff()); + } else { + // we detected parameters, must process them. + int paramStartPos = backoffPolicyStr.indexOf("exponential(" + "exponential(".length()); + int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos); + int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos); + TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY ); + int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos)); + bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries)); + } + } + } + } + + bulkProcessor = bulkProcessorBuilder.build(); + return bulkProcessor; + } + public void stop() { new InClassLoaderExecute<Object>() { protected Object execute(Object... args) { logger.info("Closing ElasticSearch persistence backend..."); + if (bulkProcessor != null) { + bulkProcessor.close(); + } node.close(); return null; } @@ -605,9 +712,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); - client.prepareUpdate(index, itemType, itemId).setDoc(source) - .execute() - .actionGet(); + if (bulkProcessor == null) { + client.prepareUpdate(index, itemType, itemId).setDoc(source) + .execute() + .actionGet(); + } else { + UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setDoc(source).request(); + bulkProcessor.add(updateRequest); + } return true; } catch (IndexNotFoundException e) { logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); @@ -632,9 +744,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); Script actualScript = new Script(script, ScriptService.ScriptType.INLINE, null, scriptParams); - client.prepareUpdate(index, itemType, itemId).setScript(actualScript) - .execute() - .actionGet(); + if (bulkProcessor == null) { + client.prepareUpdate(index, itemType, itemId).setScript(actualScript) + .execute() + .actionGet(); + } else { + UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request(); + bulkProcessor.add(updateRequest); + } return true; } catch (IndexNotFoundException e) { logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e); http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java ---------------------------------------------------------------------- diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java index 7263b28..06b83b0 100644 --- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java @@ -812,7 +812,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList } } - logger.info("Profiles past condition updated in {}", System.currentTimeMillis()-t); + logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis()-t); } private void updateExistingProfilesForSegment(Segment segment) { @@ -854,7 +854,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments()); } } - logger.info("Profiles updated in {}", System.currentTimeMillis()-t); + logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t); } private void updateExistingProfilesForScoring(Scoring scoring) { @@ -888,7 +888,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList eventService.send(entries.next().getValue()); } } - logger.info("Profiles updated in {}", System.currentTimeMillis()-t); + logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t); } private void updateExistingProfilesForRemovedScoring(String scoringId) { @@ -905,7 +905,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList for (Profile profileToRemove : previousProfiles) { persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "ctx._source.scores.remove(scoringId)", scriptParams); } - logger.info("Profiles updated in {}", System.currentTimeMillis()-t); + logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t); } private String getMD5(String md5) {
