[ https://issues.apache.org/jira/browse/NIFI-3011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671837#comment-15671837 ]
ASF GitHub Bot commented on NIFI-3011: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/1233#discussion_r88340765 --- Diff: nifi-nar-bundles/nifi-elasticsearch-5-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.get.GetRequestBuilder; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "elasticsearch 5", "fetch", "read", "get"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " + + "identifier of the document to retrieve. If the cluster has been configured for authorization and/or secure " + + "transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made. This processor " + + "supports Elasticsearch 5.x clusters.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"), + @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), + @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") + .description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") + .description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster") + .build(); + + public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder() + .name("el5-fetch-doc-id") + .displayName("Document Identifier") + .description("The identifier for the document to be fetched") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("el5-fetch-index") + .displayName("Index") + .description("The name of the index to read from") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("el5-fetch-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_RETRY); + relationships.add(REL_NOT_FOUND); + return Collections.unmodifiableSet(relationships); + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(CLUSTER_NAME); + descriptors.add(HOSTS); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(PROP_XPACK_LOCATION); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(PING_TIMEOUT); + descriptors.add(SAMPLER_INTERVAL); + descriptors.add(DOC_ID); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CHARSET); + + return Collections.unmodifiableList(descriptors); + } + + + @OnScheduled + public void setup(ProcessContext context) { + super.setup(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); + final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + + final ComponentLog logger = getLogger(); + try { + + logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId}); + GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId); + final GetResponse getResponse = getRequestBuilder.execute().actionGet(); + + if (getResponse == null || !getResponse.isExists()) { + logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found", + new Object[]{index, docType, docId}); + + // We couldn't find the document, so penalize it and send it to "not found" + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_NOT_FOUND); + } else { + flowFile = session.putAttribute(flowFile, "filename", docId); + flowFile = session.putAttribute(flowFile, "es.index", index); + flowFile = session.putAttribute(flowFile, "es.type", docType); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write(getResponse.getSourceAsString().getBytes(charset)); + } + }); + logger.debug("Elasticsearch document " + docId + " fetched, routing to success"); + // The document is JSON, so update the MIME type of the flow file + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); + session.transfer(flowFile, REL_SUCCESS); + } + } catch (NoNodeAvailableException + | ElasticsearchTimeoutException + | ReceiveTimeoutTransportException + | NodeClosedException exceptionToRetry) { + logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration " --- End diff -- The first part of this message seems to be at odds with the transfer to REL_RETRY. Maybe add an 'or' statement at the end "... or this issue may be transient. Routing to Retry" > Support Elasticsearch 5.0 for Put/FetchElasticsearch processors > --------------------------------------------------------------- > > Key: NIFI-3011 > URL: https://issues.apache.org/jira/browse/NIFI-3011 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Matt Burgess > Assignee: Matt Burgess > > Now that Elastic has released a new major version (5.0) of Elasticsearch, the > Put/FetchElasticsearch processors would need to be upgraded (or duplicated) > as the major version of the transport client needs to match the major version > of the Elasticsearch cluster. > If upgrade is selected, then Put/FetchES will no longer work with > Elasticsearch 2.x clusters, so in that case users would want to switch to the > Http versions of those processors. However this might not be desirable (due > to performance concerns with the HTTP API vs the transport API), so care must > be taken when deciding whether to upgrade the existing processors or create > new ones. > Creating new versions of these processors (to use the 5.0 transport client) > will also take some consideration, as it is unlikely the different versions > can coexist in the same NAR due to classloading issues (multiple versions of > JARs containing the same class names, e.g.). It may be necessary to create an > "elasticsearch-5.0" version of the NAR, containing only the new versions of > these processors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)