[ https://issues.apache.org/jira/browse/NIFI-3011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671827#comment-15671827 ]
ASF GitHub Bot commented on NIFI-3011: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/1233#discussion_r88333819 --- Diff: nifi-nar-bundles/nifi-elasticsearch-5-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.StringUtils; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.transport.client.PreBuiltTransportClient; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + + +abstract class AbstractElasticsearch5TransportClientProcessor extends AbstractElasticsearch5Processor { + + /** + * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries + */ + private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> { + final List<String> esList = Arrays.asList(input.split(",")); + for (String hostnamePort : esList) { + String[] addresses = hostnamePort.split(":"); + // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) + if (addresses.length != 2) { + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Must be in hostname:port form (no scheme such as http://").valid(false).build(); + } + } + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Valid cluster definition").valid(true).build(); + }; + + protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() + .name("el5-cluster-name") + .displayName("Cluster Name") + .description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("elasticsearch") + .build(); + + protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() + .name("el5-hosts") + .displayName("ElasticSearch Hosts") + .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " + + "host1:port,host2:port,.... For example testcluster:9300. This processor uses the Transport Client to " + + "connect to hosts. The default transport client port is 9300.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(HOSTNAME_PORT_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_XPACK_LOCATION = new PropertyDescriptor.Builder() + .name("el5-xpack-location") + .displayName("X-Pack Transport Location") + .description("Specifies the path to the JAR(s) for the Elasticsearch X-Pack Transport feature. At a minimum, this must be a " + + "folder and/or comma-separated list of JARs that include x-pack-transport and x-pack-api JARs. " + + "If the Elasticsearch cluster has been secured with the X-Pack plugin, then the X-Pack Transport " + + "JARs must also be available to this processor. Note: Do NOT place the X-Pack JARs into NiFi's " + + "lib/ directory, doing so will prevent the X-Pack Transport JARs from being loaded.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamicallyModifiesClasspath(true) + .build(); + + protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() + .name("el5-ping-timeout") + .displayName("ElasticSearch Ping Timeout") + .description("The ping timeout used to determine when a node is unreachable. " + + "For example, 5s (5 seconds). If non-local recommended is 30s") + .required(true) + .defaultValue("5s") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() + .name("el5-sampler-interval") + .displayName("Sampler Interval") + .description("How often to sample / ping the nodes listed and connected. For example, 5s (5 seconds). " + + "If non-local recommended is 30s.") + .required(true) + .defaultValue("5s") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected AtomicReference<Client> esClient = new AtomicReference<>(); + protected List<InetSocketAddress> esHosts; + + /** + * Instantiate ElasticSearch Client. This should be called by subclasses' @OnScheduled method to create a client + * if one does not yet exist. If called when scheduled, closeClient() should be called by the subclasses' @OnStopped + * method so the client will be destroyed when the processor is stopped. + * + * @param context The context for this processor + * @throws ProcessException if an error occurs while creating an Elasticsearch client + */ + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + + ComponentLog log = getLogger(); + if (esClient.get() != null) { + return; + } + + log.debug("Creating ElasticSearch Client"); + try { + final String clusterName = context.getProperty(CLUSTER_NAME).getValue(); + final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue(); + final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue(); + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + final SSLContextService sslService = + context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + Settings.Builder settingsBuilder = Settings.builder() + .put("cluster.name", clusterName) + .put("client.transport.ping_timeout", pingTimeout) + .put("client.transport.nodes_sampler_interval", samplerInterval); + + String xPackUrl = context.getProperty(PROP_XPACK_LOCATION).getValue(); + if (sslService != null) { + settingsBuilder.put("xpack.security.transport.ssl.enabled", "true") + .put("xpack.ssl.keystore.path", sslService.getKeyStoreFile()) + .put("xpack.ssl.keystore.password", sslService.getKeyStorePassword()) --- End diff -- "xpack.ssl.keystore.key_password" could also be set with "sslService.getKeyPassword()" > Support Elasticsearch 5.0 for Put/FetchElasticsearch processors > --------------------------------------------------------------- > > Key: NIFI-3011 > URL: https://issues.apache.org/jira/browse/NIFI-3011 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Matt Burgess > Assignee: Matt Burgess > > Now that Elastic has released a new major version (5.0) of Elasticsearch, the > Put/FetchElasticsearch processors would need to be upgraded (or duplicated) > as the major version of the transport client needs to match the major version > of the Elasticsearch cluster. > If upgrade is selected, then Put/FetchES will no longer work with > Elasticsearch 2.x clusters, so in that case users would want to switch to the > Http versions of those processors. However this might not be desirable (due > to performance concerns with the HTTP API vs the transport API), so care must > be taken when deciding whether to upgrade the existing processors or create > new ones. > Creating new versions of these processors (to use the 5.0 transport client) > will also take some consideration, as it is unlikely the different versions > can coexist in the same NAR due to classloading issues (multiple versions of > JARs containing the same class names, e.g.). It may be necessary to create an > "elasticsearch-5.0" version of the NAR, containing only the new versions of > these processors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)