[
https://issues.apache.org/jira/browse/NIFI-4002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033694#comment-16033694
]
ASF GitHub Bot commented on NIFI-4002:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1878#discussion_r119728626
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
---
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.commons.lang3.StringUtils.trimToEmpty;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write",
"put", "http", "record"})
+@CapabilityDescription("Writes the contents of a FlowFile to
Elasticsearch, using the specified parameters such as "
+ + "the index to insert into and the type of the document.")
+public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcessor {
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("All FlowFiles that are written to Elasticsearch
are routed to this relationship").build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("All FlowFiles that cannot be written to
Elasticsearch are routed to this relationship").build();
+
+ public static final Relationship REL_RETRY = new
Relationship.Builder().name("retry")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
+ .build();
+
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("put-db-record-record-reader")
--- End diff --
Yeah definitely, just a copy-paste thing :)
> Add PutElasticsearchHttpRecord processor
> ----------------------------------------
>
> Key: NIFI-4002
> URL: https://issues.apache.org/jira/browse/NIFI-4002
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> With the new Record Reader/Writer capabilities, and that the PutElasticsearch
> processors only handle a single document at a time, it would be nice to have
> a "record-aware" PutES processor, where the user could specify a Record
> Reader and for each record in the input, the processor would convert the
> record to JSON and push to an ES cluster. One necessary (but optional)
> property would be a Record Path to a field/value to be used as the document
> identifier (the current PutES processors require an attribute to hold the
> identifier). If the Record Path is not set and the operation is "index", then
> just like the PutES processors, the document identifier will be autogenerated.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)