[ 
https://issues.apache.org/jira/browse/NIFI-3011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671827#comment-15671827
 ] 

ASF GitHub Bot commented on NIFI-3011:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1233#discussion_r88333819
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-5-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java
 ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.nifi.util.StringUtils;
    +import org.elasticsearch.client.Client;
    +import org.elasticsearch.client.transport.TransportClient;
    +import org.elasticsearch.common.settings.Settings;
    +import org.elasticsearch.common.transport.InetSocketTransportAddress;
    +import org.elasticsearch.transport.client.PreBuiltTransportClient;
    +
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.InvocationTargetException;
    +import java.lang.reflect.Method;
    +import java.net.InetSocketAddress;
    +import java.net.MalformedURLException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +
    +abstract class AbstractElasticsearch5TransportClientProcessor extends 
AbstractElasticsearch5Processor {
    +
    +    /**
    +     * This validator ensures the Elasticsearch hosts property is a valid 
list of hostname:port entries
    +     */
    +    private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, 
input, context) -> {
    +        final List<String> esList = Arrays.asList(input.split(","));
    +        for (String hostnamePort : esList) {
    +            String[] addresses = hostnamePort.split(":");
    +            // Protect against invalid input like http://127.0.0.1:9300 
(URL scheme should not be there)
    +            if (addresses.length != 2) {
    +                return new 
ValidationResult.Builder().subject(subject).input(input).explanation(
    +                        "Must be in hostname:port form (no scheme such as 
http://";).valid(false).build();
    +            }
    +        }
    +        return new 
ValidationResult.Builder().subject(subject).input(input).explanation(
    +                "Valid cluster definition").valid(true).build();
    +    };
    +
    +    protected static final PropertyDescriptor CLUSTER_NAME = new 
PropertyDescriptor.Builder()
    +            .name("el5-cluster-name")
    +            .displayName("Cluster Name")
    +            .description("Name of the ES cluster (for example, 
elasticsearch_brew). Defaults to 'elasticsearch'")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .defaultValue("elasticsearch")
    +            .build();
    +
    +    protected static final PropertyDescriptor HOSTS = new 
PropertyDescriptor.Builder()
    +            .name("el5-hosts")
    +            .displayName("ElasticSearch Hosts")
    +            .description("ElasticSearch Hosts, which should be comma 
separated and colon for hostname/port "
    +                    + "host1:port,host2:port,....  For example 
testcluster:9300. This processor uses the Transport Client to "
    +                    + "connect to hosts. The default transport client port 
is 9300.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(HOSTNAME_PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_XPACK_LOCATION = new 
PropertyDescriptor.Builder()
    +            .name("el5-xpack-location")
    +            .displayName("X-Pack Transport Location")
    +            .description("Specifies the path to the JAR(s) for the 
Elasticsearch X-Pack Transport feature. At a minimum, this must be a "
    +                    + "folder and/or comma-separated list of JARs that 
include x-pack-transport and x-pack-api JARs. "
    +                    + "If the Elasticsearch cluster has been secured with 
the X-Pack plugin, then the X-Pack Transport "
    +                    + "JARs must also be available to this processor. 
Note: Do NOT place the X-Pack JARs into NiFi's "
    +                    + "lib/ directory, doing so will prevent the X-Pack 
Transport JARs from being loaded.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .dynamicallyModifiesClasspath(true)
    +            .build();
    +
    +    protected static final PropertyDescriptor PING_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("el5-ping-timeout")
    +            .displayName("ElasticSearch Ping Timeout")
    +            .description("The ping timeout used to determine when a node 
is unreachable. " +
    +                    "For example, 5s (5 seconds). If non-local recommended 
is 30s")
    +            .required(true)
    +            .defaultValue("5s")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SAMPLER_INTERVAL = new 
PropertyDescriptor.Builder()
    +            .name("el5-sampler-interval")
    +            .displayName("Sampler Interval")
    +            .description("How often to sample / ping the nodes listed and 
connected. For example, 5s (5 seconds). "
    +                    + "If non-local recommended is 30s.")
    +            .required(true)
    +            .defaultValue("5s")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    protected AtomicReference<Client> esClient = new AtomicReference<>();
    +    protected List<InetSocketAddress> esHosts;
    +
    +    /**
    +     * Instantiate ElasticSearch Client. This should be called by 
subclasses' @OnScheduled method to create a client
    +     * if one does not yet exist. If called when scheduled, closeClient() 
should be called by the subclasses' @OnStopped
    +     * method so the client will be destroyed when the processor is 
stopped.
    +     *
    +     * @param context The context for this processor
    +     * @throws ProcessException if an error occurs while creating an 
Elasticsearch client
    +     */
    +    @Override
    +    protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
    +
    +        ComponentLog log = getLogger();
    +        if (esClient.get() != null) {
    +            return;
    +        }
    +
    +        log.debug("Creating ElasticSearch Client");
    +        try {
    +            final String clusterName = 
context.getProperty(CLUSTER_NAME).getValue();
    +            final String pingTimeout = 
context.getProperty(PING_TIMEOUT).getValue();
    +            final String samplerInterval = 
context.getProperty(SAMPLER_INTERVAL).getValue();
    +            final String username = 
context.getProperty(USERNAME).getValue();
    +            final String password = 
context.getProperty(PASSWORD).getValue();
    +
    +            final SSLContextService sslService =
    +                    
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +
    +            Settings.Builder settingsBuilder = Settings.builder()
    +                    .put("cluster.name", clusterName)
    +                    .put("client.transport.ping_timeout", pingTimeout)
    +                    .put("client.transport.nodes_sampler_interval", 
samplerInterval);
    +
    +            String xPackUrl = 
context.getProperty(PROP_XPACK_LOCATION).getValue();
    +            if (sslService != null) {
    +                
settingsBuilder.put("xpack.security.transport.ssl.enabled", "true")
    +                        .put("xpack.ssl.keystore.path", 
sslService.getKeyStoreFile())
    +                        .put("xpack.ssl.keystore.password", 
sslService.getKeyStorePassword())
    --- End diff --
    
    "xpack.ssl.keystore.key_password" could also be set with 
"sslService.getKeyPassword()"


> Support Elasticsearch 5.0 for Put/FetchElasticsearch processors
> ---------------------------------------------------------------
>
>                 Key: NIFI-3011
>                 URL: https://issues.apache.org/jira/browse/NIFI-3011
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>
> Now that Elastic has released a new major version (5.0) of Elasticsearch, the 
> Put/FetchElasticsearch processors would need to be upgraded (or duplicated) 
> as the major version of the transport client needs to match the major version 
> of the Elasticsearch cluster.
> If upgrade is selected, then Put/FetchES will no longer work with 
> Elasticsearch 2.x clusters, so in that case users would want to switch to the 
> Http versions of those processors. However this might not be desirable (due 
> to performance concerns with the HTTP API vs the transport API), so care must 
> be taken when deciding whether to upgrade the existing processors or create 
> new ones.
> Creating new versions of these processors (to use the 5.0 transport client) 
> will also take some consideration, as it is unlikely the different versions 
> can coexist in the same NAR due to classloading issues (multiple versions of 
> JARs containing the same class names, e.g.). It may be necessary to create an 
> "elasticsearch-5.0" version of the NAR, containing only the new versions of 
> these processors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to