Repository: incubator-unomi Updated Branches: refs/heads/feature-UNOMI-28-ES2X 661daeea5 -> 1075a02cf
UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates - Switch to TransportClient to hopefully make load-balancing work better. This might have a minor slowdown on standalone performance. Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/1075a02c Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/1075a02c Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/1075a02c Branch: refs/heads/feature-UNOMI-28-ES2X Commit: 1075a02cfeb6a4c59d935185a3f1d81fc5159f06 Parents: 661daee Author: Serge Huber <[email protected]> Authored: Wed Nov 23 17:12:16 2016 +0100 Committer: Serge Huber <[email protected]> Committed: Wed Nov 23 17:12:16 2016 +0100 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 26 ++++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/1075a02c/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 733b3a9..794b03b 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -53,10 +53,12 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.DistanceUnit; @@ -96,9 +98,7 @@ import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; +import java.net.*; import java.nio.file.Paths; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -146,6 +146,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public static final String ELASTICSEARCH_NETWORK_HOST = "network.host"; private Node node; + private Client nodeClient; private Client client; private BulkProcessor bulkProcessor; private String clusterName; @@ -356,15 +357,26 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } node = nodeBuilder().settings(settingsBuilder).node(); - client = node.client(); + nodeClient = node.client(); + logger.info("Waiting for ElasticSearch to start..."); - client.admin().cluster().prepareHealth() + nodeClient.admin().cluster().prepareHealth() .setWaitForGreenStatus() .get(); logger.info("Cluster status is GREEN"); + try { + Settings transportSettings = Settings.settingsBuilder() + .put(CLUSTER_NAME, clusterName).build(); + client = TransportClient.builder().settings(transportSettings).build() + .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(address), 9300)); + } catch (UnknownHostException e) { + logger.error("Error resolving address " + address + " ElasticSearch transport client not connected, using internal client instead", e); + client = nodeClient; + } + // @todo is there a better way to detect index existence than to wait for it to startup ? boolean indexExists = false; int tries = 0; @@ -570,6 +582,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.error("Error waiting for bulk operations to flush !", e); } } + if (nodeClient != client) { + client.close(); + } + nodeClient.close(); node.close(); return null; }
