This is an automated email from the ASF dual-hosted git repository. shuber pushed a commit to branch UNOMI-225-ES7 in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 0fde2a80b604d9da43783d0d3c80e36434c1fb55 Author: Serge Huber <[email protected]> AuthorDate: Sun Apr 7 15:27:57 2019 +0200 UNOMI-225 Initial work on ElasticSearch 7 support. Includes switching to Java High Level Rest Client This is very prelimiinary work, it doesn't even compile yet. Signed-off-by: Serge Huber <[email protected]> --- persistence-elasticsearch/core/pom.xml | 11 +- .../ElasticSearchPersistenceServiceImpl.java | 193 ++++++--------------- 2 files changed, 58 insertions(+), 146 deletions(-) diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index 19fb68f..395c2c0 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -60,16 +60,9 @@ <version>20.0</version> </dependency> <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>${elasticsearch.version}</version> - <scope>compile</scope> - </dependency> - <dependency> <groupId>org.elasticsearch.client</groupId> - <artifactId>transport</artifactId> - <version>${elasticsearch.version}</version> - <scope>compile</scope> + <artifactId>elasticsearch-rest-high-level-client</artifactId> + <version>7.0.0-beta1</version> </dependency> <!-- The following are optional dependencies from the ElasticSearch that are made mandatory --> diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 1ab84ff..c579abf 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.unomi.persistence.elasticsearch; import com.hazelcast.core.HazelcastInstance; import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; import org.apache.unomi.api.Item; import org.apache.unomi.api.PartialList; import org.apache.unomi.api.TimestampedItem; @@ -31,6 +32,7 @@ import org.apache.unomi.metrics.MetricsService; import org.apache.unomi.persistence.elasticsearch.conditions.*; import org.apache.unomi.persistence.spi.PersistenceService; import org.apache.unomi.persistence.spi.aggregate.*; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; @@ -38,10 +40,8 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRespon import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; -import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -49,12 +49,12 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.client.*; +import org.elasticsearch.client.core.AcknowledgedResponse; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.DistanceUnit; @@ -81,30 +81,23 @@ import org.elasticsearch.search.aggregations.bucket.global.Global; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.IpRangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.osgi.framework.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; -import java.io.File; import java.io.IOException; import java.io.InputStreamReader; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.InetAddress; -import java.net.MalformedURLException; import java.net.URL; -import java.net.UnknownHostException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; @@ -126,7 +119,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval"; public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy"; private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); - private TransportClient client; + private RestHighLevelClient client; private BulkProcessor bulkProcessor; private String elasticSearchAddresses; private List<String> elasticSearchAddressList = new ArrayList<>(); @@ -158,10 +151,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private int aggregateQueryBucketSize = 5000; - private String transportClientClassName = null; - private String transportClientProperties = null; - private String transportClientJarDirectory = null; - private MetricsService metricsService; private HazelcastInstance hazelcastInstance; private Set<String> itemClassesToCacheSet = new HashSet<>(); @@ -263,18 +252,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, this.aggregateQueryBucketSize = aggregateQueryBucketSize; } - public void setTransportClientClassName(String transportClientClassName) { - this.transportClientClassName = transportClientClassName; - } - - public void setTransportClientProperties(String transportClientProperties) { - this.transportClientProperties = transportClientProperties; - } - - public void setTransportClientJarDirectory(String transportClientJarDirectory) { - this.transportClientJarDirectory = transportClientJarDirectory; - } - public void setMetricsService(MetricsService metricsService) { this.metricsService = metricsService; } @@ -327,30 +304,21 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, Settings.Builder transportSettings = Settings.builder() .put(CLUSTER_NAME, clusterName); - if (StringUtils.isNotBlank(transportClientClassName) && StringUtils.isNotBlank(transportClientJarDirectory)) { - logger.info("Connecting to ElasticSearch persistence backend using transport class " + transportClientClassName + - " with JAR directory "+transportClientJarDirectory + - " using cluster name " + clusterName + " and index name " + indexName + "..."); - client = newTransportClient(transportSettings, transportClientClassName, transportClientJarDirectory, transportClientProperties); - } else { - logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "..."); - client = new PreBuiltTransportClient(transportSettings.build()); - } + List<Node> nodeList = new ArrayList<>(); for (String elasticSearchAddress : elasticSearchAddressList) { String[] elasticSearchAddressParts = elasticSearchAddress.split(":"); String elasticSearchHostName = elasticSearchAddressParts[0]; int elasticSearchPort = Integer.parseInt(elasticSearchAddressParts[1]); - try { - client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticSearchHostName), elasticSearchPort)); - } catch (UnknownHostException e) { - String message = "Error resolving address " + elasticSearchAddress + " ElasticSearch transport client not connected"; - throw new Exception(message, e); - } + nodeList.add(new Node(new HttpHost(elasticSearchHostName, elasticSearchPort, "http"))); } + logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "..."); + client = new RestHighLevelClient( + RestClient.builder(nodeList.toArray(new Node[nodeList.size()]))); + // let's now check the versions of all the nodes in the cluster, to make sure they are as expected. try { - NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo() + NodesInfoResponse nodesInfoResponse = client.cluster().prepareNodesInfo() .all().execute().get(); org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion); @@ -384,8 +352,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, while (!indexExists && tries < 20) { - IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet(); - indexExists = indicesExistsResponse.isExists(); + indexExists = client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); tries++; try { Thread.sleep(100); @@ -419,9 +386,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, logger.info("Waiting for GREEN cluster status..."); - client.admin().cluster().prepareHealth() - .setWaitForGreenStatus() - .get(); + client.cluster().health(new ClusterHealthRequest().waitForGreenStatus(), RequestOptions.DEFAULT); logger.info("Cluster status is GREEN"); @@ -438,29 +403,32 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (bulkProcessor != null) { return bulkProcessor; } - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( - client, - new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, - BulkRequest request) { - logger.debug("Before Bulk"); - } + BulkProcessor.Listener bulkProcessorListener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, + BulkRequest request) { + logger.debug("Before Bulk"); + } - @Override - public void afterBulk(long executionId, - BulkRequest request, - BulkResponse response) { - logger.debug("After Bulk"); - } + @Override + public void afterBulk(long executionId, + BulkRequest request, + BulkResponse response) { + logger.debug("After Bulk"); + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + Throwable failure) { + logger.error("After Bulk (failure)", failure); + } + }; + BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( + (request, bulkListener) -> + client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), + bulkProcessorListener); - @Override - public void afterBulk(long executionId, - BulkRequest request, - Throwable failure) { - logger.error("After Bulk (failure)", failure); - } - }); if (bulkProcessorConcurrentRequests != null) { int concurrentRequests = Integer.parseInt(bulkProcessorConcurrentRequests); if (concurrentRequests > 1) { @@ -778,7 +746,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, for (int i = 0; i < scripts.length; i++) { Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]); - client.admin().indices().prepareRefresh(index).get(); + client.indices().prepareRefresh(index).get(); UpdateByQueryRequestBuilder ubqrb = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); ubqrb.source(index).source().setTypes(itemType); @@ -935,7 +903,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean indexTemplateExists(final String templateName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists") { protected Boolean execute(Object... args) { - GetIndexTemplatesResponse getIndexTemplatesResponse = client.admin().indices().prepareGetTemplates(templateName).execute().actionGet(); + GetIndexTemplatesResponse getIndexTemplatesResponse = client.indices().prepareGetTemplates(templateName).execute().actionGet(); return getIndexTemplatesResponse.getIndexTemplates().size() == 1; } }.catchingExecuteInClassLoader(true); @@ -949,7 +917,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean removeIndexTemplate(final String templateName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate") { protected Boolean execute(Object... args) { - DeleteIndexTemplateResponse deleteIndexTemplateResponse = client.admin().indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName)).actionGet(); + AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName)).actionGet(); return deleteIndexTemplateResponse.isAcknowledged(); } }.catchingExecuteInClassLoader(true); @@ -988,7 +956,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } putIndexTemplateRequest.mappings().putAll(indexMappings); - PutIndexTemplateResponse putIndexTemplateResponse = client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet(); + AcknowledgedResponse putIndexTemplateResponse = client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT); return putIndexTemplateResponse.isAcknowledged(); } }.catchingExecuteInClassLoader(true); @@ -1002,7 +970,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean createIndex(final String indexName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createItem") { protected Boolean execute(Object... args) { - IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet(); + IndicesExistsResponse indicesExistsResponse = client.indices().prepareExists(indexName).execute().actionGet(); boolean indexExists = indicesExistsResponse.isExists(); if (!indexExists) { Map<String, String> indexMappings = new HashMap<String, String>(); @@ -1027,10 +995,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, public boolean removeIndex(final String indexName) { Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex") { protected Boolean execute(Object... args) { - IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet(); + IndicesExistsResponse indicesExistsResponse = client.indices().prepareExists(indexName).execute().actionGet(); boolean indexExists = indicesExistsResponse.isExists(); if (indexExists) { - client.admin().indices().prepareDelete(indexName).execute().actionGet(); + client.indices().prepareDelete(indexName).execute().actionGet(); } return indexExists; } @@ -1043,7 +1011,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } private void internalCreateIndex(String indexName, Map<String, String> mappings) { - CreateIndexRequestBuilder builder = client.admin().indices().prepareCreate(indexName) + CreateIndexRequestBuilder builder = client.indices().prepareCreate(indexName) .setSettings("{\n" + " \"index\" : {\n" + " \"number_of_shards\" : " + numberOfShards + ",\n" + @@ -1070,7 +1038,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private void createMapping(final String type, final String source, final String indexName) { - client.admin().indices() + client.indices() .preparePutMapping(indexName) .setType(type) .setSource(source, XContentType.JSON) @@ -1084,11 +1052,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } if (itemsMonthlyIndexed.contains(type)) { createMonthlyIndexTemplate(); - if (client.admin().indices().prepareExists(indexName + "-*").execute().actionGet().isExists()) { + if (client.indices().prepareExists(indexName + "-*").execute().actionGet().isExists()) { createMapping(type, source, indexName + "-*"); } } else if (indexNames.containsKey(type)) { - if (client.admin().indices().prepareExists(indexNames.get(type)).execute().actionGet().isExists()) { + if (client.indices().prepareExists(indexNames.get(type)).execute().actionGet().isExists()) { createMapping(type, source, indexNames.get(type)); } } else { @@ -1102,7 +1070,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @SuppressWarnings("unchecked") protected Map<String, Map<String, Object>> execute(Object... args) throws Exception { // Get all mapping for current itemType - GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings().setTypes(itemType).execute().actionGet(); + GetMappingsResponse getMappingsResponse = client.indices().prepareGetMappings().setTypes(itemType).execute().actionGet(); ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings(); // create a list of Keys to get the mappings in chronological order @@ -1701,7 +1669,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (bulkProcessor != null) { bulkProcessor.flush(); } - client.admin().indices().refresh(Requests.refreshRequest()).actionGet(); + client.indices().refresh(Requests.refreshRequest(), RequestOptions.DEFAULT); return true; } }.catchingExecuteInClassLoader(true); @@ -1714,7 +1682,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate") { @Override protected Object execute(Object... args) throws Exception { - IndicesStatsResponse statsResponse = client.admin().indices().prepareStats(indexName + "-*") + IndicesStatsResponse statsResponse = client.indices().prepareStats(indexName + "-*") .setIndexing(false) .setGet(false) .setSearch(false) @@ -1905,55 +1873,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - public TransportClient newTransportClient(Settings.Builder settingsBuilder, - String transportClientClassName, - String transportClientJarDirectory, - String transportClientProperties) { - - ArrayList<URL> urls = new ArrayList<>(); - File pluginLocationFile = new File(transportClientJarDirectory); - - File[] pluginLocationFiles = pluginLocationFile.listFiles(); - for (File pluginFile : pluginLocationFiles) { - if (pluginFile.getName().toLowerCase().endsWith(".jar")) { - try { - urls.add(pluginFile.toURI().toURL()); - } catch (MalformedURLException e) { - e.printStackTrace(); - } - } - } - - ChildFirstClassLoader childFirstClassLoader = new ChildFirstClassLoader(this.getClass().getClassLoader(), urls.toArray(new URL[urls.size()])); - - if (StringUtils.isNotBlank(transportClientProperties)) { - String[] clientProperties = transportClientProperties.split(","); - if (clientProperties.length > 0) { - for (String clientProperty : clientProperties) { - String[] clientPropertyParts = clientProperty.split("="); - settingsBuilder.put(clientPropertyParts[0], clientPropertyParts[1]); - } - } - } - - try { - Class<?> transportClientClass = childFirstClassLoader.loadClass(transportClientClassName); - Constructor<?> transportClientConstructor = transportClientClass.getConstructor(Settings.class, Class[].class); - return (TransportClient) transportClientConstructor.newInstance(settingsBuilder.build(), new Class[0]); - } catch (ClassNotFoundException e) { - logger.error("Couldn't find class " + transportClientClassName, e); - } catch (NoSuchMethodException e) { - logger.error("Error creating transport client with class" + transportClientClassName, e); - } catch (IllegalAccessException e) { - logger.error("Error creating transport client with class" + transportClientClassName, e); - } catch (InstantiationException e) { - logger.error("Error creating transport client with class" + transportClientClassName, e); - } catch (InvocationTargetException e) { - logger.error("Error creating transport client with class" + transportClientClassName, e); - } - return null; - } - private <T extends Item> boolean isCacheActiveForClass(String className) { if (itemClassesToCacheSet.contains("*")) { return true;
