[
https://issues.apache.org/jira/browse/NIFI-3011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671827#comment-15671827
]
ASF GitHub Bot commented on NIFI-3011:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1233#discussion_r88333819
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-5-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.StringUtils;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+abstract class AbstractElasticsearch5TransportClientProcessor extends
AbstractElasticsearch5Processor {
+
+ /**
+ * This validator ensures the Elasticsearch hosts property is a valid
list of hostname:port entries
+ */
+ private static final Validator HOSTNAME_PORT_VALIDATOR = (subject,
input, context) -> {
+ final List<String> esList = Arrays.asList(input.split(","));
+ for (String hostnamePort : esList) {
+ String[] addresses = hostnamePort.split(":");
+ // Protect against invalid input like http://127.0.0.1:9300
(URL scheme should not be there)
+ if (addresses.length != 2) {
+ return new
ValidationResult.Builder().subject(subject).input(input).explanation(
+ "Must be in hostname:port form (no scheme such as
http://").valid(false).build();
+ }
+ }
+ return new
ValidationResult.Builder().subject(subject).input(input).explanation(
+ "Valid cluster definition").valid(true).build();
+ };
+
+ protected static final PropertyDescriptor CLUSTER_NAME = new
PropertyDescriptor.Builder()
+ .name("el5-cluster-name")
+ .displayName("Cluster Name")
+ .description("Name of the ES cluster (for example,
elasticsearch_brew). Defaults to 'elasticsearch'")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("elasticsearch")
+ .build();
+
+ protected static final PropertyDescriptor HOSTS = new
PropertyDescriptor.Builder()
+ .name("el5-hosts")
+ .displayName("ElasticSearch Hosts")
+ .description("ElasticSearch Hosts, which should be comma
separated and colon for hostname/port "
+ + "host1:port,host2:port,.... For example
testcluster:9300. This processor uses the Transport Client to "
+ + "connect to hosts. The default transport client port
is 9300.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(HOSTNAME_PORT_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_XPACK_LOCATION = new
PropertyDescriptor.Builder()
+ .name("el5-xpack-location")
+ .displayName("X-Pack Transport Location")
+ .description("Specifies the path to the JAR(s) for the
Elasticsearch X-Pack Transport feature. At a minimum, this must be a "
+ + "folder and/or comma-separated list of JARs that
include x-pack-transport and x-pack-api JARs. "
+ + "If the Elasticsearch cluster has been secured with
the X-Pack plugin, then the X-Pack Transport "
+ + "JARs must also be available to this processor.
Note: Do NOT place the X-Pack JARs into NiFi's "
+ + "lib/ directory, doing so will prevent the X-Pack
Transport JARs from being loaded.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dynamicallyModifiesClasspath(true)
+ .build();
+
+ protected static final PropertyDescriptor PING_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("el5-ping-timeout")
+ .displayName("ElasticSearch Ping Timeout")
+ .description("The ping timeout used to determine when a node
is unreachable. " +
+ "For example, 5s (5 seconds). If non-local recommended
is 30s")
+ .required(true)
+ .defaultValue("5s")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor SAMPLER_INTERVAL = new
PropertyDescriptor.Builder()
+ .name("el5-sampler-interval")
+ .displayName("Sampler Interval")
+ .description("How often to sample / ping the nodes listed and
connected. For example, 5s (5 seconds). "
+ + "If non-local recommended is 30s.")
+ .required(true)
+ .defaultValue("5s")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected AtomicReference<Client> esClient = new AtomicReference<>();
+ protected List<InetSocketAddress> esHosts;
+
+ /**
+ * Instantiate ElasticSearch Client. This should be called by
subclasses' @OnScheduled method to create a client
+ * if one does not yet exist. If called when scheduled, closeClient()
should be called by the subclasses' @OnStopped
+ * method so the client will be destroyed when the processor is
stopped.
+ *
+ * @param context The context for this processor
+ * @throws ProcessException if an error occurs while creating an
Elasticsearch client
+ */
+ @Override
+ protected void createElasticsearchClient(ProcessContext context)
throws ProcessException {
+
+ ComponentLog log = getLogger();
+ if (esClient.get() != null) {
+ return;
+ }
+
+ log.debug("Creating ElasticSearch Client");
+ try {
+ final String clusterName =
context.getProperty(CLUSTER_NAME).getValue();
+ final String pingTimeout =
context.getProperty(PING_TIMEOUT).getValue();
+ final String samplerInterval =
context.getProperty(SAMPLER_INTERVAL).getValue();
+ final String username =
context.getProperty(USERNAME).getValue();
+ final String password =
context.getProperty(PASSWORD).getValue();
+
+ final SSLContextService sslService =
+
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ Settings.Builder settingsBuilder = Settings.builder()
+ .put("cluster.name", clusterName)
+ .put("client.transport.ping_timeout", pingTimeout)
+ .put("client.transport.nodes_sampler_interval",
samplerInterval);
+
+ String xPackUrl =
context.getProperty(PROP_XPACK_LOCATION).getValue();
+ if (sslService != null) {
+
settingsBuilder.put("xpack.security.transport.ssl.enabled", "true")
+ .put("xpack.ssl.keystore.path",
sslService.getKeyStoreFile())
+ .put("xpack.ssl.keystore.password",
sslService.getKeyStorePassword())
--- End diff --
"xpack.ssl.keystore.key_password" could also be set with
"sslService.getKeyPassword()"
> Support Elasticsearch 5.0 for Put/FetchElasticsearch processors
> ---------------------------------------------------------------
>
> Key: NIFI-3011
> URL: https://issues.apache.org/jira/browse/NIFI-3011
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> Now that Elastic has released a new major version (5.0) of Elasticsearch, the
> Put/FetchElasticsearch processors would need to be upgraded (or duplicated)
> as the major version of the transport client needs to match the major version
> of the Elasticsearch cluster.
> If upgrade is selected, then Put/FetchES will no longer work with
> Elasticsearch 2.x clusters, so in that case users would want to switch to the
> Http versions of those processors. However this might not be desirable (due
> to performance concerns with the HTTP API vs the transport API), so care must
> be taken when deciding whether to upgrade the existing processors or create
> new ones.
> Creating new versions of these processors (to use the 5.0 transport client)
> will also take some consideration, as it is unlikely the different versions
> can coexist in the same NAR due to classloading issues (multiple versions of
> JARs containing the same class names, e.g.). It may be necessary to create an
> "elasticsearch-5.0" version of the NAR, containing only the new versions of
> these processors.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)