[
https://issues.apache.org/jira/browse/NIFI-4002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033595#comment-16033595
]
ASF GitHub Bot commented on NIFI-4002:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1878#discussion_r119714959
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
---
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.commons.lang3.StringUtils.trimToEmpty;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write",
"put", "http", "record"})
+@CapabilityDescription("Writes the contents of a FlowFile to
Elasticsearch, using the specified parameters such as "
+ + "the index to insert into and the type of the document.")
+public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcessor {
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("All FlowFiles that are written to Elasticsearch
are routed to this relationship").build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("All FlowFiles that cannot be written to
Elasticsearch are routed to this relationship").build();
+
+ public static final Relationship REL_RETRY = new
Relationship.Builder().name("retry")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
+ .build();
+
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("put-db-record-record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
parsing incoming data and determining the data's schema.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor ID_RECORD_PATH = new
PropertyDescriptor.Builder()
+ .name("put-es-record-id-path")
+ .displayName("Identifier Record Path")
+ .description("A RecordPath pointing to a field in the
record(s) that contains the identifier for the document. If the Index Operation
is \"index\", "
+ + "this property may be left empty or evaluate to an
empty value, in which case the document's identifier will be "
+ + "auto-generated by Elasticsearch. For all other
Index Operations, the field's value must be non-empty.")
+ .required(false)
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor INDEX = new
PropertyDescriptor.Builder()
+ .name("put-es-record-index")
+ .displayName("Index")
+ .description("The name of the index to insert into")
+ .required(true)
+ .expressionLanguageSupported(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
+ AttributeExpression.ResultType.STRING, true))
+ .build();
+
+ static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+ .name("put-es-record-type")
+ .displayName("Type")
+ .description("The type of this document (used by Elasticsearch
for indexing and searching)")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor INDEX_OP = new
PropertyDescriptor.Builder()
+ .name("put-es-record-index-op")
+ .displayName("Index Operation")
+ .description("The type of the operation used to index (index,
update, upsert, delete)")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .defaultValue("index")
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ private volatile RecordPathCache recordPathCache;
+
+ private final JsonFactory factory = new JsonFactory();
+
+ static {
+ final Set<Relationship> _rels = new HashSet<>();
+ _rels.add(REL_SUCCESS);
+ _rels.add(REL_FAILURE);
+ _rels.add(REL_RETRY);
+ relationships = Collections.unmodifiableSet(_rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ES_URL);
+ descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+ descriptors.add(USERNAME);
+ descriptors.add(PASSWORD);
+ descriptors.add(CONNECT_TIMEOUT);
+ descriptors.add(RESPONSE_TIMEOUT);
+ descriptors.add(RECORD_READER);
+ descriptors.add(ID_RECORD_PATH);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(INDEX_OP);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> problems = new
ArrayList<>(super.customValidate(validationContext));
+ // Since Expression Language is allowed for index operation, we
can't guarantee that we can catch
+ // all invalid configurations, but we should catch them as soon as
we can. For example, if the
+ // Identifier Record Path property is empty, the Index Operation
must evaluate to "index".
+ String idPath =
validationContext.getProperty(ID_RECORD_PATH).getValue();
+ String indexOp =
validationContext.getProperty(INDEX_OP).getValue();
+
+ if (StringUtils.isEmpty(idPath)) {
+ switch (indexOp.toLowerCase()) {
+ case "update":
+ case "upsert":
+ case "delete":
+ case "":
+ problems.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject(INDEX_OP.getDisplayName())
+ .explanation("If Identifier Record Path is not
set, Index Operation must evaluate to \"index\"")
+ .build());
+ break;
+ default:
+ break;
+ }
+ }
+ return problems;
+ }
+
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ super.setup(context);
+ recordPathCache = new RecordPathCache(10);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+ // Authentication
+ final String username =
context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String password =
context.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
+
+ OkHttpClient okHttpClient = getClient();
+ final ComponentLog logger = getLogger();
+
+ final String baseUrl =
trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
+ final URL url;
+ try {
+ url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl +
"/") + "_bulk");
+ } catch (MalformedURLException mue) {
+ // Since we have a URL validator, something has gone very
wrong, throw a ProcessException
+ context.yield();
+ throw new ProcessException(mue);
+ }
+
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isEmpty(index)) {
+ logger.error("No value for index in for {}, transferring to
failure", new Object[]{flowFile});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ final String docType =
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+ String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isEmpty(indexOp)) {
+ logger.error("No Index operation specified for {},
transferring to failure.", new Object[]{flowFile});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ switch (indexOp.toLowerCase()) {
+ case "index":
+ case "update":
+ case "upsert":
+ case "delete":
+ break;
+ default:
+ logger.error("Index operation {} not supported for {},
transferring to failure.", new Object[]{indexOp, flowFile});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final String id_path =
context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null
: recordPathCache.getCompiled(id_path);
+ final StringBuilder sb = new StringBuilder();
--- End diff --
Since we are going to be building up this big string of commands, should we
document in the description of the processor that all records in the incoming
flow file will be read into memory, in case someone sends in flow file with
millions of records?
> Add PutElasticsearchHttpRecord processor
> ----------------------------------------
>
> Key: NIFI-4002
> URL: https://issues.apache.org/jira/browse/NIFI-4002
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> With the new Record Reader/Writer capabilities, and that the PutElasticsearch
> processors only handle a single document at a time, it would be nice to have
> a "record-aware" PutES processor, where the user could specify a Record
> Reader and for each record in the input, the processor would convert the
> record to JSON and push to an ES cluster. One necessary (but optional)
> property would be a Record Path to a field/value to be used as the document
> identifier (the current PutES processors require an attribute to hold the
> identifier). If the Record Path is not set and the operation is "index", then
> just like the PutES processors, the document identifier will be autogenerated.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)