[ 
https://issues.apache.org/jira/browse/NIFI-3011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671837#comment-15671837
 ] 

ASF GitHub Bot commented on NIFI-3011:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1233#discussion_r88340765
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-5-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
 ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.elasticsearch.ElasticsearchTimeoutException;
    +import org.elasticsearch.action.get.GetRequestBuilder;
    +import org.elasticsearch.action.get.GetResponse;
    +import org.elasticsearch.client.transport.NoNodeAvailableException;
    +import org.elasticsearch.node.NodeClosedException;
    +import org.elasticsearch.transport.ReceiveTimeoutTransportException;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@EventDriven
    +@SupportsBatching
    +@Tags({"elasticsearch", "elasticsearch 5", "fetch", "read", "get"})
    +@CapabilityDescription("Retrieves a document from Elasticsearch using the 
specified connection properties and the "
    +        + "identifier of the document to retrieve. If the cluster has been 
configured for authorization and/or secure "
    +        + "transport (SSL/TLS), and the X-Pack plugin is available, secure 
connections can be made. This processor "
    +        + "supports Elasticsearch 5.x clusters.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "The 
filename attributes is set to the document identifier"),
    +        @WritesAttribute(attribute = "es.index", description = "The 
Elasticsearch index containing the document"),
    +        @WritesAttribute(attribute = "es.type", description = "The 
Elasticsearch document type")
    +})
    +public class FetchElasticsearch5 extends 
AbstractElasticsearch5TransportClientProcessor {
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
    +            .description("All FlowFiles that are read from Elasticsearch 
are routed to this relationship").build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
    +            .description("All FlowFiles that cannot be read from 
Elasticsearch are routed to this relationship").build();
    +
    +    public static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
    +            .description("A FlowFile is routed to this relationship if the 
document cannot be fetched but attempting the operation again may succeed")
    +            .build();
    +
    +    public static final Relationship REL_NOT_FOUND = new 
Relationship.Builder().name("not found")
    +            .description("A FlowFile is routed to this relationship if the 
specified document does not exist in the Elasticsearch cluster")
    +            .build();
    +
    +    public static final PropertyDescriptor DOC_ID = new 
PropertyDescriptor.Builder()
    +            .name("el5-fetch-doc-id")
    +            .displayName("Document Identifier")
    +            .description("The identifier for the document to be fetched")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new 
PropertyDescriptor.Builder()
    +            .name("el5-fetch-index")
    +            .displayName("Index")
    +            .description("The name of the index to read from")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new 
PropertyDescriptor.Builder()
    +            .name("el5-fetch-type")
    +            .displayName("Type")
    +            .description("The type of this document (used by Elasticsearch 
for indexing and searching)")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.add(REL_RETRY);
    +        relationships.add(REL_NOT_FOUND);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(CLUSTER_NAME);
    +        descriptors.add(HOSTS);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(PROP_XPACK_LOCATION);
    +        descriptors.add(USERNAME);
    +        descriptors.add(PASSWORD);
    +        descriptors.add(PING_TIMEOUT);
    +        descriptors.add(SAMPLER_INTERVAL);
    +        descriptors.add(DOC_ID);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(CHARSET);
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        super.setup(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
    +        final String docId = 
context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
    +        final String docType = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
    +        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        final ComponentLog logger = getLogger();
    +        try {
    +
    +            logger.debug("Fetching {}/{}/{} from Elasticsearch", new 
Object[]{index, docType, docId});
    +            GetRequestBuilder getRequestBuilder = 
esClient.get().prepareGet(index, docType, docId);
    +            final GetResponse getResponse = 
getRequestBuilder.execute().actionGet();
    +
    +            if (getResponse == null || !getResponse.isExists()) {
    +                logger.warn("Failed to read {}/{}/{} from Elasticsearch: 
Document not found",
    +                        new Object[]{index, docType, docId});
    +
    +                // We couldn't find the document, so penalize it and send 
it to "not found"
    +                flowFile = session.penalize(flowFile);
    +                session.transfer(flowFile, REL_NOT_FOUND);
    +            } else {
    +                flowFile = session.putAttribute(flowFile, "filename", 
docId);
    +                flowFile = session.putAttribute(flowFile, "es.index", 
index);
    +                flowFile = session.putAttribute(flowFile, "es.type", 
docType);
    +                flowFile = session.write(flowFile, new 
OutputStreamCallback() {
    +                    @Override
    +                    public void process(OutputStream out) throws 
IOException {
    +                        
out.write(getResponse.getSourceAsString().getBytes(charset));
    +                    }
    +                });
    +                logger.debug("Elasticsearch document " + docId + " 
fetched, routing to success");
    +                // The document is JSON, so update the MIME type of the 
flow file
    +                flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
    +                session.transfer(flowFile, REL_SUCCESS);
    +            }
    +        } catch (NoNodeAvailableException
    +                | ElasticsearchTimeoutException
    +                | ReceiveTimeoutTransportException
    +                | NodeClosedException exceptionToRetry) {
    +            logger.error("Failed to read into Elasticsearch due to {}, 
this may indicate an error in configuration "
    --- End diff --
    
    The first part of this message seems to be at odds with the transfer to 
REL_RETRY. Maybe add an 'or' statement at the end "... or this issue may be 
transient. Routing to Retry"


> Support Elasticsearch 5.0 for Put/FetchElasticsearch processors
> ---------------------------------------------------------------
>
>                 Key: NIFI-3011
>                 URL: https://issues.apache.org/jira/browse/NIFI-3011
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>
> Now that Elastic has released a new major version (5.0) of Elasticsearch, the 
> Put/FetchElasticsearch processors would need to be upgraded (or duplicated) 
> as the major version of the transport client needs to match the major version 
> of the Elasticsearch cluster.
> If upgrade is selected, then Put/FetchES will no longer work with 
> Elasticsearch 2.x clusters, so in that case users would want to switch to the 
> Http versions of those processors. However this might not be desirable (due 
> to performance concerns with the HTTP API vs the transport API), so care must 
> be taken when deciding whether to upgrade the existing processors or create 
> new ones.
> Creating new versions of these processors (to use the 5.0 transport client) 
> will also take some consideration, as it is unlikely the different versions 
> can coexist in the same NAR due to classloading issues (multiple versions of 
> JARs containing the same class names, e.g.). It may be necessary to create an 
> "elasticsearch-5.0" version of the NAR, containing only the new versions of 
> these processors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to