[
https://issues.apache.org/jira/browse/NIFI-4002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033704#comment-16033704
]
ASF GitHub Bot commented on NIFI-4002:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1878#discussion_r119729793
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
---
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.commons.lang3.StringUtils.trimToEmpty;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write",
"put", "http", "record"})
+@CapabilityDescription("Writes the contents of a FlowFile to
Elasticsearch, using the specified parameters such as "
+ + "the index to insert into and the type of the document.")
+public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcessor {
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("All FlowFiles that are written to Elasticsearch
are routed to this relationship").build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("All FlowFiles that cannot be written to
Elasticsearch are routed to this relationship").build();
+
+ public static final Relationship REL_RETRY = new
Relationship.Builder().name("retry")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
+ .build();
+
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("put-db-record-record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
parsing incoming data and determining the data's schema.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor ID_RECORD_PATH = new
PropertyDescriptor.Builder()
+ .name("put-es-record-id-path")
+ .displayName("Identifier Record Path")
+ .description("A RecordPath pointing to a field in the
record(s) that contains the identifier for the document. If the Index Operation
is \"index\", "
+ + "this property may be left empty or evaluate to an
empty value, in which case the document's identifier will be "
+ + "auto-generated by Elasticsearch. For all other
Index Operations, the field's value must be non-empty.")
+ .required(false)
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor INDEX = new
PropertyDescriptor.Builder()
+ .name("put-es-record-index")
+ .displayName("Index")
+ .description("The name of the index to insert into")
+ .required(true)
+ .expressionLanguageSupported(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
+ AttributeExpression.ResultType.STRING, true))
+ .build();
+
+ static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+ .name("put-es-record-type")
+ .displayName("Type")
+ .description("The type of this document (used by Elasticsearch
for indexing and searching)")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor INDEX_OP = new
PropertyDescriptor.Builder()
+ .name("put-es-record-index-op")
+ .displayName("Index Operation")
+ .description("The type of the operation used to index (index,
update, upsert, delete)")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .defaultValue("index")
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ private volatile RecordPathCache recordPathCache;
+
+ private final JsonFactory factory = new JsonFactory();
+
+ static {
+ final Set<Relationship> _rels = new HashSet<>();
+ _rels.add(REL_SUCCESS);
+ _rels.add(REL_FAILURE);
+ _rels.add(REL_RETRY);
+ relationships = Collections.unmodifiableSet(_rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ES_URL);
+ descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+ descriptors.add(USERNAME);
+ descriptors.add(PASSWORD);
+ descriptors.add(CONNECT_TIMEOUT);
+ descriptors.add(RESPONSE_TIMEOUT);
+ descriptors.add(RECORD_READER);
+ descriptors.add(ID_RECORD_PATH);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(INDEX_OP);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> problems = new
ArrayList<>(super.customValidate(validationContext));
+ // Since Expression Language is allowed for index operation, we
can't guarantee that we can catch
+ // all invalid configurations, but we should catch them as soon as
we can. For example, if the
+ // Identifier Record Path property is empty, the Index Operation
must evaluate to "index".
+ String idPath =
validationContext.getProperty(ID_RECORD_PATH).getValue();
+ String indexOp =
validationContext.getProperty(INDEX_OP).getValue();
+
+ if (StringUtils.isEmpty(idPath)) {
+ switch (indexOp.toLowerCase()) {
+ case "update":
+ case "upsert":
+ case "delete":
+ case "":
+ problems.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject(INDEX_OP.getDisplayName())
+ .explanation("If Identifier Record Path is not
set, Index Operation must evaluate to \"index\"")
+ .build());
+ break;
+ default:
+ break;
+ }
+ }
+ return problems;
+ }
+
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ super.setup(context);
+ recordPathCache = new RecordPathCache(10);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+ // Authentication
+ final String username =
context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String password =
context.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
+
+ OkHttpClient okHttpClient = getClient();
+ final ComponentLog logger = getLogger();
+
+ final String baseUrl =
trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
+ final URL url;
+ try {
+ url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl +
"/") + "_bulk");
+ } catch (MalformedURLException mue) {
+ // Since we have a URL validator, something has gone very
wrong, throw a ProcessException
+ context.yield();
+ throw new ProcessException(mue);
+ }
+
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isEmpty(index)) {
+ logger.error("No value for index in for {}, transferring to
failure", new Object[]{flowFile});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ final String docType =
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+ String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isEmpty(indexOp)) {
+ logger.error("No Index operation specified for {},
transferring to failure.", new Object[]{flowFile});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ switch (indexOp.toLowerCase()) {
+ case "index":
+ case "update":
+ case "upsert":
+ case "delete":
+ break;
+ default:
+ logger.error("Index operation {} not supported for {},
transferring to failure.", new Object[]{indexOp, flowFile});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final String id_path =
context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null
: recordPathCache.getCompiled(id_path);
+ final StringBuilder sb = new StringBuilder();
+
+ try (final InputStream in = session.read(flowFile);
+ final RecordReader reader =
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+
+ final String id;
+ if (recordPath != null) {
+ Optional<FieldValue> idPathValue =
recordPath.evaluate(record).getSelectedFields().findFirst();
+ if (!idPathValue.isPresent() ||
idPathValue.get().getValue() == null) {
+ throw new IdentifierNotFoundException("Identifier
Record Path specified but no value was found, transferring {} to failure.");
+ }
+ id = idPathValue.get().getValue().toString();
+ } else {
+ id = null;
+ }
+
+ // The ID must be valid for all operations except "index".
For that case,
+ // a missing ID indicates one is to be auto-generated by
Elasticsearch
+ if (id == null && !indexOp.equalsIgnoreCase("index")) {
+ throw new IdentifierNotFoundException("Index operation
{} requires a valid identifier value from a flow file attribute, transferring
to failure.");
+ }
+
+ final StringBuilder json = new StringBuilder();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator generator = factory.createJsonGenerator(out);
+ writeRecord(record, record.getSchema(), generator);
+ generator.flush();
+ generator.close();
+ json.append(out.toString());
+
+ if (indexOp.equalsIgnoreCase("index")) {
+ sb.append("{\"index\": { \"_index\": \"");
+ sb.append(index);
+ sb.append("\", \"_type\": \"");
+ sb.append(docType);
+ sb.append("\"");
+ if (!StringUtils.isEmpty(id)) {
+ sb.append(", \"_id\": \"");
+ sb.append(id);
+ sb.append("\"");
+ }
+ sb.append("}}\n");
+ sb.append(json);
+ sb.append("\n");
+ } else if (indexOp.equalsIgnoreCase("upsert") ||
indexOp.equalsIgnoreCase("update")) {
+ sb.append("{\"update\": { \"_index\": \"");
+ sb.append(index);
+ sb.append("\", \"_type\": \"");
+ sb.append(docType);
+ sb.append("\", \"_id\": \"");
+ sb.append(id);
+ sb.append("\" }\n");
+ sb.append("{\"doc\": ");
+ sb.append(json);
+ sb.append(", \"doc_as_upsert\": ");
+ sb.append(indexOp.equalsIgnoreCase("upsert"));
+ sb.append(" }\n");
+ } else if (indexOp.equalsIgnoreCase("delete")) {
+ sb.append("{\"delete\": { \"_index\": \"");
+ sb.append(index);
+ sb.append("\", \"_type\": \"");
+ sb.append(docType);
+ sb.append("\", \"_id\": \"");
+ sb.append(id);
+ sb.append("\" }\n");
+ }
+ }
+ } catch (IdentifierNotFoundException infe) {
+ logger.error(infe.getMessage(), new Object[]{flowFile});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+
+ } catch (final IOException | SchemaNotFoundException |
MalformedRecordException e) {
+ throw new ProcessException("Could not parse incoming data", e);
+ }
+
+ RequestBody requestBody =
RequestBody.create(MediaType.parse("application/json"), sb.toString());
+ final Response getResponse;
+ try {
+ getResponse = sendRequestToElasticsearch(okHttpClient, url,
username, password, "PUT", requestBody);
+ } catch (final Exception e) {
+ logger.error("Routing to {} due to exception: {}", new
Object[]{REL_FAILURE.getName(), e}, e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ final int statusCode = getResponse.code();
+
+ if (isSuccess(statusCode)) {
+ ResponseBody responseBody = getResponse.body();
+ try {
+ final byte[] bodyBytes = responseBody.bytes();
+
+ JsonNode responseJson = parseJsonResponse(new
ByteArrayInputStream(bodyBytes));
+ boolean errors =
responseJson.get("errors").asBoolean(false);
+ // ES has no rollback, so if errors occur, log them and
route the whole flow file to failure
+ if (errors) {
+ ArrayNode itemNodeArray = (ArrayNode)
responseJson.get("items");
+ if (itemNodeArray.size() > 0) {
+ // All items are returned whether they succeeded
or failed, so iterate through the item array
+ // at the same time as the flow file list, logging
failures accordingly
+ for (int i = itemNodeArray.size() - 1; i >= 0;
i--) {
+ JsonNode itemNode = itemNodeArray.get(i);
+ int status =
itemNode.findPath("status").asInt();
+ if (!isSuccess(status)) {
+ String reason =
itemNode.findPath("//error/reason").asText();
+ logger.error("Failed to insert {} into
Elasticsearch due to {}, transferring to failure",
+ new Object[]{flowFile, reason});
+ }
+ }
+ }
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ session.transfer(flowFile, REL_SUCCESS);
--- End diff --
Yep definitely, forgot to add it, will add now
> Add PutElasticsearchHttpRecord processor
> ----------------------------------------
>
> Key: NIFI-4002
> URL: https://issues.apache.org/jira/browse/NIFI-4002
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> With the new Record Reader/Writer capabilities, and that the PutElasticsearch
> processors only handle a single document at a time, it would be nice to have
> a "record-aware" PutES processor, where the user could specify a Record
> Reader and for each record in the input, the processor would convert the
> record to JSON and push to an ES cluster. One necessary (but optional)
> property would be a Record Path to a field/value to be used as the document
> identifier (the current PutES processors require an attribute to hold the
> identifier). If the Record Path is not set and the operation is "index", then
> just like the PutES processors, the document identifier will be autogenerated.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)