[
https://issues.apache.org/jira/browse/NIFI-2068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362627#comment-15362627
]
ASF GitHub Bot commented on NIFI-2068:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/576#discussion_r69582549
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
---
@@ -194,65 +209,61 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final byte[] bodyBytes = body.bytes();
JsonNode responseJson = parseJsonResponse(new
ByteArrayInputStream(bodyBytes));
boolean found = responseJson.get("found").asBoolean(false);
+ String retrievedIndex =
responseJson.get("_index").asText();
+ String retrievedType = responseJson.get("_type").asText();
+ String retrievedId = responseJson.get("_id").asText();
if (found) {
- flowFile = session.putAttribute(flowFile, "filename",
docId);
- flowFile = session.putAttribute(flowFile, "es.index",
index);
- flowFile = session.putAttribute(flowFile, "es.type",
docType);
+ String source = responseJson.get("_source").toString();
+ flowFile = session.putAttribute(flowFile, "filename",
retrievedId);
+ flowFile = session.putAttribute(flowFile, "es.index",
retrievedIndex);
+ flowFile = session.putAttribute(flowFile, "es.type",
retrievedType);
flowFile = session.write(flowFile, out -> {
- out.write(bodyBytes);
+ out.write(source.getBytes());
});
- logger.debug("Elasticsearch document " + docId + "
fetched, routing to success");
+ logger.debug("Elasticsearch document " + retrievedId +
" fetched, routing to success");
// emit provenance event
final long millis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().fetch(flowFile,
url.toExternalForm(), millis);
+ if (context.hasIncomingConnection()) {
--- End diff --
This should probably be "hasNonLoopConnection" instead of
"hasIncomingConnection". That way if the processor is acting as a source but
the Rel_Retry is routed back it will properly still emit "receive". That or
create a flag at the beginning of the ontrigger for if the flowfile is is
created here or is from an incoming relationship.
> Add Elasticsearch processors that use the REST API
> --------------------------------------------------
>
> Key: NIFI-2068
> URL: https://issues.apache.org/jira/browse/NIFI-2068
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.0.0
>
>
> The current Elasticsearch processors use the Transport Client, and as a
> result there can be some compatibility issues between multiple versions of ES
> clusters. The REST API is much more standard between versions, so it would be
> nice to have ES processors that use the REST API, to enable things like
> migration from an Elasticsearch cluster with an older version to a cluster
> with a newer version.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)