[
https://issues.apache.org/jira/browse/NIFI-5248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598744#comment-16598744
]
ASF GitHub Bot commented on NIFI-5248:
--------------------------------------
Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2861#discussion_r214358747
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticSearchError;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"json", "elasticsearch", "5x", "6x", "put", "index", "record"})
+@CapabilityDescription("A record-aware ElasticSearch put processor that
uses the official Elastic REST client libraries.")
+public class PutElasticsearchRecord extends AbstractProcessor implements
ElasticSearchRestProcessor {
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("put-es-record-reader")
+ .displayName("Record Reader")
+ .description("The record reader to use for reading incoming
records from flowfiles.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static final PropertyDescriptor OPERATION_RECORD_PATH = new
PropertyDescriptor.Builder()
+ .name("put-es-record-operation-path")
+ .displayName("A record path expression to retrieve index operation
setting from each record. If left blank, " +
+ "all operations will be assumed to be index operations.")
+ .addValidator(Validator.VALID)
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor ID_RECORD_PATH = new
PropertyDescriptor.Builder()
+ .name("put-es-record-id-path")
+ .displayName("ID Record Path")
+ .description("A record path expression to retrieve the ID field
for use with ElasticSearch. If left blank " +
+ "the ID will be automatically generated by ElasticSearch.")
+ .addValidator(Validator.VALID)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor INDEX_RECORD_PATH = new
PropertyDescriptor.Builder()
+ .name("put-es-record-index-record-path")
+ .displayName("Index Record Path")
+ .description("A record path expression to retrieve the ID field
for use with ElasticSearch. If left blank " +
+ "the index will be determined using the main index
property.")
+ .addValidator(Validator.VALID)
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor TYPE_RECORD_PATH = new
PropertyDescriptor.Builder()
+ .name("put-es-record-type-record-path")
+ .displayName("Type Record Path")
+ .description("A record path expression to retrieve the type field
for use with ElasticSearch. If left blank " +
+ "the type will be determined using the main type
property.")
+ .addValidator(Validator.VALID)
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return new HashSet<Relationship>(){{
+ add(REL_SUCCESS);
+ add(REL_FAILURE);
+ add(REL_RETRY);
+ }};
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.add(INDEX);
+ _temp.add(TYPE);
+ _temp.add(CLIENT_SERVICE);
+ _temp.add(RECORD_READER);
+ _temp.add(OPERATION_RECORD_PATH);
+ _temp.add(ID_RECORD_PATH);
+ _temp.add(INDEX_RECORD_PATH);
+ _temp.add(TYPE_RECORD_PATH);
+
+ return Collections.unmodifiableList(_temp);
+ }
+
+ private RecordReaderFactory readerFactory;
+ private RecordPathCache recordPathCache;
+ private ElasticSearchClientService clientService;
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ this.readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ this.clientService =
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+ this.recordPathCache = new RecordPathCache(16);
+ }
+
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile input = session.get();
+ if (input == null) {
+ return;
+ }
+
+ try {
+ List<IndexOperationRequest> bundle = buildOperations(input,
context, session);
+ clientService.bulk(bundle);
+
+ session.transfer(input, REL_SUCCESS);
+ } catch (ElasticSearchError ese) {
+ String msg = String.format("Encountered a server-side problem
with ElasticSearch. %s",
--- End diff --
I think I'll keep that one as-is because it's cleanly structured.
> Create new put processors that use the ElasticSearch client service
> -------------------------------------------------------------------
>
> Key: NIFI-5248
> URL: https://issues.apache.org/jira/browse/NIFI-5248
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Mike Thomsen
> Assignee: Mike Thomsen
> Priority: Major
>
> Two new processors:
> * PutElasticsearchJson - put raw JSON.
> * PutElasticsearchRecord - put records.
> Both of them should support the general bulk load API and be able to do
> things like insert into multiple indexes from one payload.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)