[
https://issues.apache.org/jira/browse/NIFI-2068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362629#comment-15362629
]
ASF GitHub Bot commented on NIFI-2068:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/576#discussion_r69582751
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
---
@@ -194,65 +209,61 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final byte[] bodyBytes = body.bytes();
JsonNode responseJson = parseJsonResponse(new
ByteArrayInputStream(bodyBytes));
boolean found = responseJson.get("found").asBoolean(false);
+ String retrievedIndex =
responseJson.get("_index").asText();
+ String retrievedType = responseJson.get("_type").asText();
+ String retrievedId = responseJson.get("_id").asText();
if (found) {
- flowFile = session.putAttribute(flowFile, "filename",
docId);
- flowFile = session.putAttribute(flowFile, "es.index",
index);
- flowFile = session.putAttribute(flowFile, "es.type",
docType);
+ String source = responseJson.get("_source").toString();
+ flowFile = session.putAttribute(flowFile, "filename",
retrievedId);
+ flowFile = session.putAttribute(flowFile, "es.index",
retrievedIndex);
+ flowFile = session.putAttribute(flowFile, "es.type",
retrievedType);
flowFile = session.write(flowFile, out -> {
- out.write(bodyBytes);
+ out.write(source.getBytes());
});
- logger.debug("Elasticsearch document " + docId + "
fetched, routing to success");
+ logger.debug("Elasticsearch document " + retrievedId +
" fetched, routing to success");
// emit provenance event
final long millis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().fetch(flowFile,
url.toExternalForm(), millis);
+ if (context.hasIncomingConnection()) {
+ session.getProvenanceReporter().fetch(flowFile,
url.toExternalForm(), millis);
+ } else {
+ session.getProvenanceReporter().receive(flowFile,
url.toExternalForm(), millis);
+ }
session.transfer(flowFile, REL_SUCCESS);
} else {
logger.warn("Failed to read {}/{}/{} from
Elasticsearch: Document not found",
new Object[]{index, docType, docId});
- // We couldn't find the document, so penalize it and
send it to "not found"
- // If there was an incoming flow file, penalize and
transfer. Otherwise just remove it
- if (context.hasIncomingConnection()) {
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_NOT_FOUND);
- } else {
- session.remove(flowFile);
- }
+ // We couldn't find the document, so send it to "not
found"
+ session.transfer(flowFile, REL_NOT_FOUND);
}
-
} else {
if (statusCode == 404) {
logger.warn("Failed to read {}/{}/{} from
Elasticsearch: Document not found",
new Object[]{index, docType, docId});
// We couldn't find the document, so penalize it and
send it to "not found"
- // If there was an incoming flow file, penalize and
transfer. Otherwise just remove it
- if (context.hasIncomingConnection()) {
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_NOT_FOUND);
- } else {
- session.remove(flowFile);
- }
- } else if (context.hasIncomingConnection()) {
- // 5xx -> RETRY
+ session.transfer(flowFile, REL_NOT_FOUND);
+ } else {
+ // 5xx -> RETRY, but a server error might last a
while, so yield
if (statusCode / 100 == 5) {
- logger.info("Elasticsearch returned code {} with
message {}, transferring flow file to retry", new Object[]{statusCode,
getResponse.message()});
- flowFile = session.penalize(flowFile);
--- End diff --
Why remove the penalize here?
> Add Elasticsearch processors that use the REST API
> --------------------------------------------------
>
> Key: NIFI-2068
> URL: https://issues.apache.org/jira/browse/NIFI-2068
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.0.0
>
>
> The current Elasticsearch processors use the Transport Client, and as a
> result there can be some compatibility issues between multiple versions of ES
> clusters. The REST API is much more standard between versions, so it would be
> nice to have ES processors that use the REST API, to enable things like
> migration from an Elasticsearch cluster with an older version to a cluster
> with a newer version.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)