[ 
https://issues.apache.org/jira/browse/NIFI-2068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362627#comment-15362627
 ] 

ASF GitHub Bot commented on NIFI-2068:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/576#discussion_r69582549
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
 ---
    @@ -194,65 +209,61 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
                     final byte[] bodyBytes = body.bytes();
                     JsonNode responseJson = parseJsonResponse(new 
ByteArrayInputStream(bodyBytes));
                     boolean found = responseJson.get("found").asBoolean(false);
    +                String retrievedIndex = 
responseJson.get("_index").asText();
    +                String retrievedType = responseJson.get("_type").asText();
    +                String retrievedId = responseJson.get("_id").asText();
     
                     if (found) {
    -                    flowFile = session.putAttribute(flowFile, "filename", 
docId);
    -                    flowFile = session.putAttribute(flowFile, "es.index", 
index);
    -                    flowFile = session.putAttribute(flowFile, "es.type", 
docType);
    +                    String source = responseJson.get("_source").toString();
    +                    flowFile = session.putAttribute(flowFile, "filename", 
retrievedId);
    +                    flowFile = session.putAttribute(flowFile, "es.index", 
retrievedIndex);
    +                    flowFile = session.putAttribute(flowFile, "es.type", 
retrievedType);
                         flowFile = session.write(flowFile, out -> {
    -                        out.write(bodyBytes);
    +                        out.write(source.getBytes());
                         });
    -                    logger.debug("Elasticsearch document " + docId + " 
fetched, routing to success");
    +                    logger.debug("Elasticsearch document " + retrievedId + 
" fetched, routing to success");
     
                         // emit provenance event
                         final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
    -                    session.getProvenanceReporter().fetch(flowFile, 
url.toExternalForm(), millis);
    +                    if (context.hasIncomingConnection()) {
    --- End diff --
    
    This should probably be "hasNonLoopConnection" instead of 
"hasIncomingConnection". That way if the processor is acting as a source but 
the Rel_Retry is routed back it will properly still emit "receive". That or 
create a flag at the beginning of the ontrigger for if the flowfile is is 
created here or is from an incoming relationship.


> Add Elasticsearch processors that use the REST API
> --------------------------------------------------
>
>                 Key: NIFI-2068
>                 URL: https://issues.apache.org/jira/browse/NIFI-2068
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> The current Elasticsearch processors use the Transport Client, and as a 
> result there can be some compatibility issues between multiple versions of ES 
> clusters. The REST API is much more standard between versions, so it would be 
> nice to have ES processors that use the REST API, to enable things like 
> migration from an Elasticsearch cluster with an older version to a cluster 
> with a newer version.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to