[ 
https://issues.apache.org/jira/browse/NIFI-2068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362638#comment-15362638
 ] 

ASF GitHub Bot commented on NIFI-2068:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/576#discussion_r69584216
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
 ---
    @@ -194,65 +209,61 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
                     final byte[] bodyBytes = body.bytes();
                     JsonNode responseJson = parseJsonResponse(new 
ByteArrayInputStream(bodyBytes));
                     boolean found = responseJson.get("found").asBoolean(false);
    +                String retrievedIndex = 
responseJson.get("_index").asText();
    +                String retrievedType = responseJson.get("_type").asText();
    +                String retrievedId = responseJson.get("_id").asText();
     
                     if (found) {
    -                    flowFile = session.putAttribute(flowFile, "filename", 
docId);
    -                    flowFile = session.putAttribute(flowFile, "es.index", 
index);
    -                    flowFile = session.putAttribute(flowFile, "es.type", 
docType);
    +                    String source = responseJson.get("_source").toString();
    +                    flowFile = session.putAttribute(flowFile, "filename", 
retrievedId);
    +                    flowFile = session.putAttribute(flowFile, "es.index", 
retrievedIndex);
    +                    flowFile = session.putAttribute(flowFile, "es.type", 
retrievedType);
                         flowFile = session.write(flowFile, out -> {
    -                        out.write(bodyBytes);
    +                        out.write(source.getBytes());
                         });
    -                    logger.debug("Elasticsearch document " + docId + " 
fetched, routing to success");
    +                    logger.debug("Elasticsearch document " + retrievedId + 
" fetched, routing to success");
     
                         // emit provenance event
                         final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
    -                    session.getProvenanceReporter().fetch(flowFile, 
url.toExternalForm(), millis);
    +                    if (context.hasIncomingConnection()) {
    +                        session.getProvenanceReporter().fetch(flowFile, 
url.toExternalForm(), millis);
    +                    } else {
    +                        session.getProvenanceReporter().receive(flowFile, 
url.toExternalForm(), millis);
    +                    }
                         session.transfer(flowFile, REL_SUCCESS);
                     } else {
                         logger.warn("Failed to read {}/{}/{} from 
Elasticsearch: Document not found",
                                 new Object[]{index, docType, docId});
     
    -                    // We couldn't find the document, so penalize it and 
send it to "not found"
    -                    // If there was an incoming flow file, penalize and 
transfer. Otherwise just remove it
    -                    if (context.hasIncomingConnection()) {
    -                        flowFile = session.penalize(flowFile);
    -                        session.transfer(flowFile, REL_NOT_FOUND);
    -                    } else {
    -                        session.remove(flowFile);
    -                    }
    +                    // We couldn't find the document, so send it to "not 
found"
    +                    session.transfer(flowFile, REL_NOT_FOUND);
                     }
    -
                 } else {
                     if (statusCode == 404) {
                         logger.warn("Failed to read {}/{}/{} from 
Elasticsearch: Document not found",
                                 new Object[]{index, docType, docId});
     
                         // We couldn't find the document, so penalize it and 
send it to "not found"
    -                    // If there was an incoming flow file, penalize and 
transfer. Otherwise just remove it
    -                    if (context.hasIncomingConnection()) {
    -                        flowFile = session.penalize(flowFile);
    -                        session.transfer(flowFile, REL_NOT_FOUND);
    -                    } else {
    -                        session.remove(flowFile);
    -                    }
    -                } else if (context.hasIncomingConnection()) {
    -                    // 5xx -> RETRY
    +                    session.transfer(flowFile, REL_NOT_FOUND);
    +                } else {
    +                    // 5xx -> RETRY, but a server error might last a 
while, so yield
                         if (statusCode / 100 == 5) {
    -                        logger.info("Elasticsearch returned code {} with 
message {}, transferring flow file to retry", new Object[]{statusCode, 
getResponse.message()});
    -                        flowFile = session.penalize(flowFile);
    --- End diff --
    
    Ah I see the context.yield.


> Add Elasticsearch processors that use the REST API
> --------------------------------------------------
>
>                 Key: NIFI-2068
>                 URL: https://issues.apache.org/jira/browse/NIFI-2068
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> The current Elasticsearch processors use the Transport Client, and as a 
> result there can be some compatibility issues between multiple versions of ES 
> clusters. The REST API is much more standard between versions, so it would be 
> nice to have ES processors that use the REST API, to enable things like 
> migration from an Elasticsearch cluster with an older version to a cluster 
> with a newer version.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to