[
https://issues.apache.org/jira/browse/NIFI-3011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671837#comment-15671837
]
ASF GitHub Bot commented on NIFI-3011:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1233#discussion_r88340765
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-5-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.get.GetRequestBuilder;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.elasticsearch.node.NodeClosedException;
+import org.elasticsearch.transport.ReceiveTimeoutTransportException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"elasticsearch", "elasticsearch 5", "fetch", "read", "get"})
+@CapabilityDescription("Retrieves a document from Elasticsearch using the
specified connection properties and the "
+ + "identifier of the document to retrieve. If the cluster has been
configured for authorization and/or secure "
+ + "transport (SSL/TLS), and the X-Pack plugin is available, secure
connections can be made. This processor "
+ + "supports Elasticsearch 5.x clusters.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "The
filename attributes is set to the document identifier"),
+ @WritesAttribute(attribute = "es.index", description = "The
Elasticsearch index containing the document"),
+ @WritesAttribute(attribute = "es.type", description = "The
Elasticsearch document type")
+})
+public class FetchElasticsearch5 extends
AbstractElasticsearch5TransportClientProcessor {
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("All FlowFiles that are read from Elasticsearch
are routed to this relationship").build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("All FlowFiles that cannot be read from
Elasticsearch are routed to this relationship").build();
+
+ public static final Relationship REL_RETRY = new
Relationship.Builder().name("retry")
+ .description("A FlowFile is routed to this relationship if the
document cannot be fetched but attempting the operation again may succeed")
+ .build();
+
+ public static final Relationship REL_NOT_FOUND = new
Relationship.Builder().name("not found")
+ .description("A FlowFile is routed to this relationship if the
specified document does not exist in the Elasticsearch cluster")
+ .build();
+
+ public static final PropertyDescriptor DOC_ID = new
PropertyDescriptor.Builder()
+ .name("el5-fetch-doc-id")
+ .displayName("Document Identifier")
+ .description("The identifier for the document to be fetched")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor INDEX = new
PropertyDescriptor.Builder()
+ .name("el5-fetch-index")
+ .displayName("Index")
+ .description("The name of the index to read from")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TYPE = new
PropertyDescriptor.Builder()
+ .name("el5-fetch-type")
+ .displayName("Type")
+ .description("The type of this document (used by Elasticsearch
for indexing and searching)")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_RETRY);
+ relationships.add(REL_NOT_FOUND);
+ return Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(CLUSTER_NAME);
+ descriptors.add(HOSTS);
+ descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+ descriptors.add(PROP_XPACK_LOCATION);
+ descriptors.add(USERNAME);
+ descriptors.add(PASSWORD);
+ descriptors.add(PING_TIMEOUT);
+ descriptors.add(SAMPLER_INTERVAL);
+ descriptors.add(DOC_ID);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(CHARSET);
+
+ return Collections.unmodifiableList(descriptors);
+ }
+
+
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ super.setup(context);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ final String docId =
context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
+ final String docType =
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).getValue());
+
+ final ComponentLog logger = getLogger();
+ try {
+
+ logger.debug("Fetching {}/{}/{} from Elasticsearch", new
Object[]{index, docType, docId});
+ GetRequestBuilder getRequestBuilder =
esClient.get().prepareGet(index, docType, docId);
+ final GetResponse getResponse =
getRequestBuilder.execute().actionGet();
+
+ if (getResponse == null || !getResponse.isExists()) {
+ logger.warn("Failed to read {}/{}/{} from Elasticsearch:
Document not found",
+ new Object[]{index, docType, docId});
+
+ // We couldn't find the document, so penalize it and send
it to "not found"
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_NOT_FOUND);
+ } else {
+ flowFile = session.putAttribute(flowFile, "filename",
docId);
+ flowFile = session.putAttribute(flowFile, "es.index",
index);
+ flowFile = session.putAttribute(flowFile, "es.type",
docType);
+ flowFile = session.write(flowFile, new
OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws
IOException {
+
out.write(getResponse.getSourceAsString().getBytes(charset));
+ }
+ });
+ logger.debug("Elasticsearch document " + docId + "
fetched, routing to success");
+ // The document is JSON, so update the MIME type of the
flow file
+ flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), "application/json");
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+ } catch (NoNodeAvailableException
+ | ElasticsearchTimeoutException
+ | ReceiveTimeoutTransportException
+ | NodeClosedException exceptionToRetry) {
+ logger.error("Failed to read into Elasticsearch due to {},
this may indicate an error in configuration "
--- End diff --
The first part of this message seems to be at odds with the transfer to
REL_RETRY. Maybe add an 'or' statement at the end "... or this issue may be
transient. Routing to Retry"
> Support Elasticsearch 5.0 for Put/FetchElasticsearch processors
> ---------------------------------------------------------------
>
> Key: NIFI-3011
> URL: https://issues.apache.org/jira/browse/NIFI-3011
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> Now that Elastic has released a new major version (5.0) of Elasticsearch, the
> Put/FetchElasticsearch processors would need to be upgraded (or duplicated)
> as the major version of the transport client needs to match the major version
> of the Elasticsearch cluster.
> If upgrade is selected, then Put/FetchES will no longer work with
> Elasticsearch 2.x clusters, so in that case users would want to switch to the
> Http versions of those processors. However this might not be desirable (due
> to performance concerns with the HTTP API vs the transport API), so care must
> be taken when deciding whether to upgrade the existing processors or create
> new ones.
> Creating new versions of these processors (to use the 5.0 transport client)
> will also take some consideration, as it is unlikely the different versions
> can coexist in the same NAR due to classloading issues (multiple versions of
> JARs containing the same class names, e.g.). It may be necessary to create an
> "elasticsearch-5.0" version of the NAR, containing only the new versions of
> these processors.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)