This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 53809dd  NIFI-9439: - Add PutElasticsearchJson processor to 
Elasticsearch REST bundle - Deprecate 
PutElasticsearchHttp/PutElasticsearchHttpReccord in favour of Elasticsearch 
REST processors
53809dd is described below

commit 53809dd83f1b2e150aaf70a607f6885fa292cbdd
Author: Chris Sampson <[email protected]>
AuthorDate: Fri Dec 3 14:44:44 2021 +0000

    NIFI-9439:
    - Add PutElasticsearchJson processor to Elasticsearch REST bundle
    - Deprecate PutElasticsearchHttp/PutElasticsearchHttpReccord in favour of 
Elasticsearch REST processors
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #5566.
---
 .../AbstractElasticsearchHttpProcessor.java        |   1 +
 .../elasticsearch/IdentifierNotFoundException.java |   1 +
 .../elasticsearch/PutElasticsearchHttp.java        |   5 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   5 +-
 .../elasticsearch/RetryableException.java          |   1 +
 .../elasticsearch/UnretryableException.java        |   2 +-
 .../elasticsearch/AbstractPutElasticsearch.java    | 165 +++++++++++++
 .../elasticsearch/ElasticsearchRestProcessor.java  |   4 -
 .../processors/elasticsearch/GetElasticsearch.java |   3 +-
 .../elasticsearch/PutElasticsearchJson.java        | 220 +++++++++++++++++
 .../elasticsearch/PutElasticsearchRecord.java      | 138 +++--------
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../additionalDetails.html                         |  44 ++++
 .../additionalDetails.html                         |  11 +-
 .../elasticsearch/PutElasticsearchJsonTest.groovy  | 272 +++++++++++++++++++++
 .../PutElasticsearchRecordTest.groovy              |  14 ++
 16 files changed, 770 insertions(+), 118 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index 8609b0f..fe56e12 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -55,6 +55,7 @@ import javax.net.ssl.X509TrustManager;
 /**
  * A base class for Elasticsearch processors that use the HTTP API
  */
+@Deprecated
 public abstract class AbstractElasticsearchHttpProcessor extends 
AbstractElasticsearchProcessor {
     static final String SOURCE_QUERY_PARAM = "_source";
     static final String QUERY_QUERY_PARAM = "q";
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
index 35402d7..eb61d3c 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/IdentifierNotFoundException.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.elasticsearch;
 /**
  * A domain-specific exception for when a valid Elasticsearch document 
identifier is expected but not found
  */
+@Deprecated
 public class IdentifierNotFoundException extends Exception {
 
     public IdentifierNotFoundException() {
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 5e9bdf3..ab54aa6 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.SystemResource;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -61,7 +62,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-
+@Deprecated
+@DeprecationNotice(classNames = 
{"org.apache.nifi.processors.elasticsearch.PutElasticsearchJson"},
+        reason = "This processor is deprecated and may be removed in future 
releases.")
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @EventDriven
 @SupportsBatching
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 6333be3..97e55ff 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
@@ -87,7 +88,9 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
-
+@Deprecated
+@DeprecationNotice(classNames = 
{"org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord"},
+        reason = "This processor is deprecated and may be removed in future 
releases.")
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @EventDriven
 @Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", 
"put", "http", "record"})
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
index 8e94145..0ecec5d 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.elasticsearch;
 /**
  * Represents a retryable exception from ElasticSearch.
  */
+@Deprecated
 public class RetryableException extends RuntimeException {
 
     private static final long serialVersionUID = -2755015600102381620L;
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
index bae83cf..d6071f7 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
@@ -19,8 +19,8 @@ package org.apache.nifi.processors.elasticsearch;
 /**
  * Represents an unrecoverable error from ElasticSearch.
  * @author jgresock
- *
  */
+@Deprecated
 public class UnretryableException extends RuntimeException {
     private static final long serialVersionUID = -4528006567211380914L;
 
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
new file mode 100644
index 0000000..2ede632
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.elasticsearch.IndexOperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractPutElasticsearch extends AbstractProcessor 
implements ElasticsearchRestProcessor {
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("put-es-record-batch-size")
+            .displayName("Batch Size")
+            .description("The preferred number of FlowFiles to send over in a 
single batch.")
+            .defaultValue("100")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
+        .name("put-es-record-index-op")
+        .displayName("Index Operation")
+        .description("The type of the operation used to index (create, delete, 
index, update, upsert)")
+        .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue(IndexOperationRequest.Operation.Index.getValue())
+        .required(true)
+        .build();
+
+    static final List<String> ALLOWED_INDEX_OPERATIONS = 
Collections.unmodifiableList(Arrays.asList(
+            IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
+            IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
+            IndexOperationRequest.Operation.Index.getValue().toLowerCase(),
+            IndexOperationRequest.Operation.Update.getValue().toLowerCase(),
+            IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
+    ));
+
+    boolean logErrors;
+    ObjectMapper errorMapper;
+
+    volatile ElasticSearchClientService clientService;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+        this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
+
+        if (errorMapper == null && (logErrors || 
getLogger().isDebugEnabled())) {
+            errorMapper = new ObjectMapper();
+            errorMapper.enable(SerializationFeature.INDENT_OUTPUT);
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new ArrayList<>();
+
+        final PropertyValue indexOp = validationContext.getProperty(INDEX_OP);
+        final ValidationResult.Builder indexOpValidationResult = new 
ValidationResult.Builder().subject(INDEX_OP.getName());
+        if (!indexOp.isExpressionLanguagePresent()) {
+            final String indexOpValue = 
indexOp.evaluateAttributeExpressions().getValue();
+            indexOpValidationResult.input(indexOpValue);
+            if 
(!ALLOWED_INDEX_OPERATIONS.contains(indexOpValue.toLowerCase())) {
+                indexOpValidationResult.valid(false)
+                        .explanation(String.format("%s must be Expression 
Language or one of %s",
+                                INDEX_OP.getDisplayName(), 
ALLOWED_INDEX_OPERATIONS)
+                        );
+            } else {
+                indexOpValidationResult.valid(true);
+            }
+        } else {
+            
indexOpValidationResult.valid(true).input(indexOp.getValue()).explanation("Expression
 Language present");
+        }
+        validationResults.add(indexOpValidationResult.build());
+
+        return validationResults;
+    }
+
+    void transferFlowFilesOnException(final Exception ex, final Relationship 
rel, final ProcessSession session,
+                                      final boolean penalize, final 
FlowFile... flowFiles) {
+        for (FlowFile flowFile : flowFiles) {
+            flowFile = session.putAttribute(flowFile, 
"elasticsearch.put.error", ex.getMessage() == null ? "null" : ex.getMessage());
+            if (penalize) {
+                session.penalize(flowFile);
+            }
+            session.transfer(flowFile, rel);
+        }
+    }
+
+    void logElasticsearchDocumentErrors(final IndexOperationResponse response) 
throws JsonProcessingException {
+        if (logErrors || getLogger().isDebugEnabled()) {
+            final List<Map<String, Object>> errors = response.getItems();
+            final String output = String.format("An error was encountered 
while processing bulk operations. Server response below:%n%n%s", 
errorMapper.writeValueAsString(errors));
+
+            if (logErrors) {
+                getLogger().error(output);
+            } else {
+                getLogger().debug(output);
+            }
+        }
+    }
+
+    List<Integer> findElasticsearchErrorIndices(final IndexOperationResponse 
response) {
+        final List<Integer> indices = new 
ArrayList<>(response.getItems().size());
+        for (int index = 0; index < response.getItems().size(); index++) {
+            final Map<String, Object> current = response.getItems().get(index);
+            if (!current.isEmpty()) {
+                final String key = 
current.keySet().stream().findFirst().orElse(null);
+                @SuppressWarnings("unchecked") final Map<String, Object> inner 
= (Map<String, Object>) current.get(key);
+                if (inner != null && inner.containsKey("error")) {
+                    indices.add(index);
+                }
+            }
+        }
+        return indices;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
index 886279e..58c447d 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
@@ -104,10 +104,6 @@ public interface ElasticsearchRestProcessor {
             .name("success")
             .description("All flowfiles that succeed in being transferred into 
Elasticsearch go here.")
             .build();
-    Relationship REL_FAILED_RECORDS = new Relationship.Builder()
-            .name("errors").description("If an output record write is set, any 
record that failed to process the way it was " +
-                    "configured will be sent to this relationship as part of a 
failed record record set.")
-            .autoTerminateDefault(true).build();
 
     default String getQuery(final FlowFile input, final ProcessContext 
context, final ProcessSession session) throws IOException {
         String retVal = null;
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
index e416281..ae75d08 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
@@ -49,7 +49,8 @@ import java.util.concurrent.TimeUnit;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
 @Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "put", "index", "record"})
-@CapabilityDescription("Elasticsearch get processor that uses the official 
Elastic REST client libraries. " +
+@CapabilityDescription("Elasticsearch get processor that uses the official 
Elastic REST client libraries " +
+        "to fetch a single document from Elasticsearch by _id. " +
         "Note that the full body of the document will be read into memory 
before being written to a FlowFile for transfer.")
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The filename 
attribute is set to the document identifier"),
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
new file mode 100644
index 0000000..4614c05
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.elasticsearch.IndexOperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "put", "index"})
+@CapabilityDescription("An Elasticsearch put processor that uses the official 
Elastic REST client libraries.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "elasticsearch.put.error", description = 
"The error message provided by Elasticsearch if there is an error indexing the 
document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing. " +
+                "These parameters will override any matching parameters in the 
_bulk request body. " +
+                "If FlowFiles are batched, only the first FlowFile in the 
batch is used to evaluate property values.")
+@SystemResourceConsideration(
+        resource = SystemResource.MEMORY,
+        description = "The Batch of FlowFiles will be stored in memory until 
the bulk operation is performed.")
+public class PutElasticsearchJson extends AbstractPutElasticsearch {
+    static final PropertyDescriptor ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("put-es-json-id-attr")
+            .displayName("Identifier Attribute")
+            .description("The name of the FlowFile attribute containing the 
identifier for the document. If the Index Operation is \"index\", "
+                    + "this property may be left empty or evaluate to an empty 
value, in which case the document's identifier will be "
+                    + "auto-generated by Elasticsearch. For all other Index 
Operations, the attribute must evaluate to a non-empty value.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+        .name("put-es-json-charset")
+        .displayName("Character Set")
+        .description("Specifies the character set of the document data.")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue(StandardCharsets.UTF_8.name())
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor OUTPUT_ERROR_DOCUMENTS = new 
PropertyDescriptor.Builder()
+        .name("put-es-json-error-documents")
+        .displayName("Output Error Documents")
+        .description("If this configuration property is true, the response 
from Elasticsearch will be examined for failed documents " +
+                "and the failed documents will be sent to the \"errors\" 
relationship.")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .required(true)
+        .build();
+
+    static final Relationship REL_FAILED_DOCUMENTS = new Relationship.Builder()
+            .name("errors").description("If \"" + 
OUTPUT_ERROR_DOCUMENTS.getDisplayName() + "\" is set, " +
+                    "any FlowFile that failed to process the way it was 
configured will be sent to this relationship " +
+                    "as part of a failed document set.")
+            .autoTerminateDefault(true).build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+        ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET, 
CLIENT_SERVICE, LOG_ERROR_RESPONSES, OUTPUT_ERROR_DOCUMENTS
+    ));
+    static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+        REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
+    )));
+
+    private boolean outputErrors;
+    private final ObjectMapper inputMapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        this.outputErrors = 
context.getProperty(OUTPUT_ERROR_DOCUMENTS).asBoolean();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+
+        final List<FlowFile> flowFiles = session.get(batchSize);
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final String idAttribute = 
context.getProperty(ID_ATTRIBUTE).getValue();
+
+        final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
+        final List<IndexOperationRequest> operations = new 
ArrayList<>(flowFiles.size());
+
+        for (FlowFile input : flowFiles) {
+            final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+            final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+            final String id = StringUtils.isNotBlank(idAttribute) ? 
input.getAttribute(idAttribute) : null;
+
+            final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();
+
+            try (final InputStream inStream = session.read(input)) {
+                final byte[] result = IOUtils.toByteArray(inStream);
+                @SuppressWarnings("unchecked")
+                final Map<String, Object> contentMap = 
inputMapper.readValue(new String(result, charset), Map.class);
+
+                final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
+                operations.add(new IndexOperationRequest(index, type, id, 
contentMap, o));
+
+                originals.add(input);
+            } catch (final IOException ioe) {
+                getLogger().error("Could not read FlowFile content valid 
JSON.", ioe);
+                input = session.putAttribute(input, "elasticsearch.put.error", 
ioe.getMessage());
+                session.penalize(input);
+                session.transfer(input, REL_FAILURE);
+            } catch (final Exception ex) {
+                getLogger().error("Could not index documents.", ex);
+                input = session.putAttribute(input, "elasticsearch.put.error", 
ex.getMessage());
+                session.penalize(input);
+                session.transfer(input, REL_FAILURE);
+            }
+        }
+
+        if (!originals.isEmpty()) {
+            try {
+                final List<FlowFile> errorDocuments = 
indexDocuments(operations, originals, context);
+                session.transfer(errorDocuments, REL_FAILED_DOCUMENTS);
+
+                session.transfer(originals.stream().filter(f -> 
!errorDocuments.contains(f)).collect(Collectors.toList()), REL_SUCCESS);
+            } catch (final ElasticsearchException ese) {
+                final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
+                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
+                getLogger().error(msg, ese);
+                final Relationship rel = ese.isElastic() ? REL_RETRY : 
REL_FAILURE;
+                transferFlowFilesOnException(ese, rel, session, true, 
originals.toArray(new FlowFile[0]));
+            } catch (final JsonProcessingException jpe) {
+                getLogger().warn("Could not log Elasticsearch operation errors 
nor determine which documents errored.", jpe);
+                final Relationship rel = outputErrors ? REL_FAILED_DOCUMENTS : 
REL_FAILURE;
+                transferFlowFilesOnException(jpe, rel, session, true, 
originals.toArray(new FlowFile[0]));
+            } catch (final Exception ex) {
+                getLogger().error("Could not index documents.", ex);
+                transferFlowFilesOnException(ex, REL_FAILURE, session, false, 
originals.toArray(new FlowFile[0]));
+                context.yield();
+            }
+        } else {
+            getLogger().warn("No FlowFiles successfully parsed for sending to 
Elasticsearch");
+        }
+    }
+
+    private List<FlowFile> indexDocuments(final List<IndexOperationRequest> 
operations, final List<FlowFile> originals, final ProcessContext context) 
throws JsonProcessingException {
+        final IndexOperationResponse response = clientService.bulk(operations, 
getUrlQueryParameters(context, originals.get(0)));
+        final List<FlowFile> errorDocuments = new 
ArrayList<>(response.getItems() == null ? 0 : response.getItems().size());
+        if (response.hasErrors()) {
+            logElasticsearchDocumentErrors(response);
+
+            if (outputErrors) {
+                findElasticsearchErrorIndices(response).forEach(index -> 
errorDocuments.add(originals.get(index)));
+            }
+        }
+        return errorDocuments;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index fe058c0..0a1fd31 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -17,27 +17,22 @@
 
 package org.apache.nifi.processors.elasticsearch;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
-import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.ElasticsearchException;
 import org.apache.nifi.elasticsearch.IndexOperationRequest;
 import org.apache.nifi.elasticsearch.IndexOperationResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -49,6 +44,7 @@ import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
 import org.apache.nifi.record.path.util.RecordPathCache;
 import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
@@ -62,11 +58,11 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -86,7 +82,10 @@ import java.util.Set;
         expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
         description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing. " +
                 "These parameters will override any matching parameters in the 
_bulk request body")
-public class PutElasticsearchRecord extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+@SystemResourceConsideration(
+        resource = SystemResource.MEMORY,
+        description = "The Batch of Records will be stored in memory until the 
bulk operation is performed.")
+public class PutElasticsearchRecord extends AbstractPutElasticsearch {
     static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
         .name("put-es-record-reader")
         .displayName("Record Reader")
@@ -96,23 +95,9 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
         .build();
 
     static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
-        .name("put-es-record-batch-size")
-        .displayName("Batch Size")
+        .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
         .description("The number of records to send over in a single batch.")
-        .defaultValue("100")
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-        .required(true)
-        .build();
-
-    static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
-        .name("put-es-record-index-op")
-        .displayName("Index Operation")
-        .description("The type of the operation used to index (create, delete, 
index, update, upsert)")
-        .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-        .defaultValue(IndexOperationRequest.Operation.Index.getValue())
-        .required(true)
         .build();
 
     static final PropertyDescriptor AT_TIMESTAMP = new 
PropertyDescriptor.Builder()
@@ -245,6 +230,11 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
         .required(false)
         .build();
 
+    static final Relationship REL_FAILED_RECORDS = new Relationship.Builder()
+            .name("errors").description("If an Error Record Writer is set, any 
record that failed to process the way it was " +
+                    "configured will be sent to this relationship as part of a 
failed record set.")
+            .autoTerminateDefault(true).build();
+
     static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
         INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, 
BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
         INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, 
AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
@@ -254,21 +244,10 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
         REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
     )));
 
-    static final List<String> ALLOWED_INDEX_OPERATIONS = 
Collections.unmodifiableList(Arrays.asList(
-            IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
-            IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
-            IndexOperationRequest.Operation.Index.getValue().toLowerCase(),
-            IndexOperationRequest.Operation.Update.getValue().toLowerCase(),
-            IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
-    ));
-
     private RecordPathCache recordPathCache;
     private RecordReaderFactory readerFactory;
     private RecordSetWriterFactory writerFactory;
-    private boolean logErrors;
-    private ObjectMapper errorMapper;
 
-    private volatile ElasticSearchClientService clientService;
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
@@ -283,24 +262,13 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
         return DESCRIPTORS;
     }
 
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .required(false)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-                .dynamic(true)
-                .build();
-    }
-
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
         this.readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        this.clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
         this.recordPathCache = new RecordPathCache(16);
         this.writerFactory = 
context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
 
         this.dateFormat = 
context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.dateFormat == null) {
@@ -314,36 +282,6 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
         if (this.timestampFormat == null) {
             this.timestampFormat = 
RecordFieldType.TIMESTAMP.getDefaultFormat();
         }
-
-        if (errorMapper == null && (logErrors || 
getLogger().isDebugEnabled())) {
-            errorMapper = new ObjectMapper();
-            errorMapper.enable(SerializationFeature.INDENT_OUTPUT);
-        }
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final List<ValidationResult> validationResults = new ArrayList<>();
-
-        final PropertyValue indexOp = validationContext.getProperty(INDEX_OP);
-        final ValidationResult.Builder indexOpValidationResult = new 
ValidationResult.Builder().subject(INDEX_OP.getName());
-        if (!indexOp.isExpressionLanguagePresent()) {
-            final String indexOpValue = 
indexOp.evaluateAttributeExpressions().getValue();
-            indexOpValidationResult.input(indexOpValue);
-            if 
(!ALLOWED_INDEX_OPERATIONS.contains(indexOpValue.toLowerCase())) {
-                indexOpValidationResult.valid(false)
-                        .explanation(String.format("%s must be Expression 
Language or one of %s",
-                                INDEX_OP.getDisplayName(), 
ALLOWED_INDEX_OPERATIONS)
-                        );
-            } else {
-                indexOpValidationResult.valid(true);
-            }
-        } else {
-            
indexOpValidationResult.valid(true).input(indexOp.getValue()).explanation("Expression
 Language present");
-        }
-        validationResults.add(indexOpValidationResult.build());
-
-        return validationResults;
     }
 
     @Override
@@ -423,15 +361,18 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
                     ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
             getLogger().error(msg, ese);
             final Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
-            session.penalize(input);
-            input = session.putAttribute(input, "elasticsearch.put.error", 
ese.getMessage());
-            session.transfer(input, rel);
+            transferFlowFilesOnException(ese, rel, session, true, input);
+            removeBadRecordFlowFiles(badRecords, session);
+            return;
+        } catch (final IOException | SchemaNotFoundException ex) {
+            getLogger().warn("Could not log Elasticsearch operation errors nor 
determine which documents errored.", ex);
+            final Relationship rel = writerFactory != null ? 
REL_FAILED_RECORDS : REL_FAILURE;
+            transferFlowFilesOnException(ex, rel, session, true, input);
             removeBadRecordFlowFiles(badRecords, session);
             return;
         } catch (final Exception ex) {
             getLogger().error("Could not index documents.", ex);
-            input = session.putAttribute(input, "elasticsearch.put.error", 
ex.getMessage());
-            session.transfer(input, REL_FAILURE);
+            transferFlowFilesOnException(ex, REL_FAILURE, session, false, 
input);
             context.yield();
             removeBadRecordFlowFiles(badRecords, session);
             return;
@@ -447,19 +388,10 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
         bad.clear();
     }
 
-    private FlowFile indexDocuments(final BulkOperation bundle, final 
ProcessContext context, final ProcessSession session, final FlowFile input) 
throws Exception {
+    private FlowFile indexDocuments(final BulkOperation bundle, final 
ProcessContext context, final ProcessSession session, final FlowFile input) 
throws IOException, SchemaNotFoundException {
         final IndexOperationResponse response = 
clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context, 
input));
         if (response.hasErrors()) {
-            if (logErrors || getLogger().isDebugEnabled()) {
-                final List<Map<String, Object>> errors = response.getItems();
-                final String output = String.format("An error was encountered 
while processing bulk operations. Server response below:%n%n%s", 
errorMapper.writeValueAsString(errors));
-
-                if (logErrors) {
-                    getLogger().error(output);
-                } else {
-                    getLogger().debug(output);
-                }
-            }
+            logElasticsearchDocumentErrors(response);
 
             if (writerFactory != null) {
                 FlowFile errorFF = session.create(input);
@@ -469,17 +401,9 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
                          final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
 
                         writer.beginRecordSet();
-                        for (int index = 0; index < 
response.getItems().size(); index++) {
-                            final Map<String, Object> current = 
response.getItems().get(index);
-                            if (!current.isEmpty()) {
-                                final String key = 
current.keySet().stream().findFirst().orElse(null);
-                                @SuppressWarnings("unchecked")
-                                final Map<String, Object> inner = (Map<String, 
Object>) current.get(key);
-                                if (inner != null && 
inner.containsKey("error")) {
-                                    
writer.write(bundle.getOriginalRecords().get(index));
-                                    added++;
-                                }
-                            }
+                        for (final int index : 
findElasticsearchErrorIndices(response)) {
+                            
writer.write(bundle.getOriginalRecords().get(index));
+                            added++;
                         }
                         writer.finishRecordSet();
                     }
@@ -489,8 +413,8 @@ public class PutElasticsearchRecord extends 
AbstractProcessor implements Elastic
                     session.transfer(errorFF, REL_FAILED_RECORDS);
 
                     return errorFF;
-                } catch (final Exception ex) {
-                    getLogger().error("", ex);
+                } catch (final IOException | SchemaNotFoundException ex) {
+                    getLogger().error("Unable to write error records", ex);
                     session.remove(errorFF);
                     throw ex;
                 }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 69bbf40..a22d16a 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,5 +18,7 @@ 
org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.SearchElasticsearch
 org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
+org.apache.nifi.processors.elasticsearch.PutElasticsearchJson
 org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch
 org.apache.nifi.processors.elasticsearch.GetElasticsearch
+
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
new file mode 100644
index 0000000..403cb7d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchJson/additionalDetails.html
@@ -0,0 +1,44 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutElasticsearchJson</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+<body>
+<p>
+    This processor is for accessing the Elasticsearch Bulk API. It provides 
the ability to configure bulk operations on
+    a per-FlowFile basis, which is what separates it from 
PutElasticsearchRecord.
+</p>
+<p>
+    As part of the Elasticsearch REST API bundle, it uses a controller service 
to manage connection information and
+    that controller service is built on top of the official Elasticsearch 
client APIs. That provides features such as
+    automatic master detection against the cluster which is missing in the 
other bundles.
+</p>
+<p>
+    This processor builds one Elasticsearch Bulk API body per (batch of) 
FlowFiles. Care should be taken to batch FlowFiles
+    into appropriately-sized chunks so that NiFi does not run out of memory 
and the requests sent to Elasticsearch are
+    not too large for it to handle. When failures do occur, this processor is 
capable of attempting to route the FlowFiles
+    that failed to an errors queue so that only failed FlowFiles can be 
processed downstream or replayed.
+</p>
+<p>
+    The index, operation and (optional) type fields are configured with 
default values.
+    The ID (optional unless the operation is "index") can be set as an 
attribute on the FlowFile(s).
+    The following is an example of a document exercising all of these features:
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
index 76e862e..18da9cb 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html
@@ -22,7 +22,7 @@
 <body>
 <p>
     This processor is for accessing the Elasticsearch Bulk API. It provides 
the ability to configure bulk operations on
-    a per-record basis which is what separates it from 
PutElasticsearchHttpRecord. For example, it is possible to define
+    a per-record basis which is what separates it from PutElasticsearchJson. 
For example, it is possible to define
     multiple commands to index documents, followed by deletes, creates and 
update operations against the same index or
     other indices as desired.
 </p>
@@ -41,7 +41,10 @@
     The index, operation and (optional) type fields are configured with 
default values that can be overridden using
     record path operations that find an index or type value in the record set.
     The ID and operation type (create, index, update, upsert or delete) can 
also be extracted in a similar fashion from
-    the record set. The following is an example of a document exercising all 
of these features:
+    the record set.
+    An "@timestamp" field can be added to the data either using a default or 
by extracting it from the record set.
+    This is useful if the documents are being indexed into an Elasticsearch 
Data Stream.
+    The following is an example of a document exercising all of these features:
 </p>
 <pre>
     {
@@ -52,7 +55,8 @@
             "operation": "index"
         },
         "message": "Hello, world",
-        "from": "john.smith"
+        "from": "john.smith",
+        "ts": "2021-12-03'T'14:00:00.000Z"
     }
 </pre>
 <pre>
@@ -71,6 +75,7 @@
     <li>/metadata/index</li>
     <li>metadata/type</li>
     <li>metadata/operation</li>
+    <li>/ts</li>
 </ul>
 <p>Valid values for "operation" are:</p>
 <ul>
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
new file mode 100644
index 0000000..ab979fe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+
+import static org.hamcrest.CoreMatchers.containsString
+import static org.hamcrest.MatcherAssert.assertThat
+
+class PutElasticsearchJsonTest {
+    MockBulkLoadClientService clientService
+    TestRunner runner
+
+    static final String flowFileContents = prettyPrint(toJson(
+            [ msg: "Hello, world", from: "john.smith" ]
+    ))
+
+    @Before
+    void setup() {
+        clientService = new MockBulkLoadClientService()
+        runner   = TestRunners.newTestRunner(PutElasticsearchJson.class)
+
+        clientService.response = new IndexOperationResponse(1500)
+
+        runner.addControllerService("clientService", clientService)
+        runner.setProperty(PutElasticsearchJson.ID_ATTRIBUTE, "doc_id")
+        runner.setProperty(PutElasticsearchJson.INDEX_OP, 
IndexOperationRequest.Operation.Index.getValue())
+        runner.setProperty(PutElasticsearchJson.INDEX, "test_index")
+        runner.setProperty(PutElasticsearchJson.TYPE, "test_type")
+        runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "1")
+        runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, 
"false")
+        runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false")
+        runner.setProperty(PutElasticsearchJson.CLIENT_SERVICE, 
"clientService")
+        runner.enableControllerService(clientService)
+
+        runner.assertValid()
+    }
+
+    void basicTest(int failure, int retry, int success) {
+        def evalClosure = { List<IndexOperationRequest> items ->
+            int nullIdCount = items.findAll { it.id == null }.size()
+            int indexCount = items.findAll { it.index == "test_index" }.size()
+            int typeCount = items.findAll { it.type == "test_type" }.size()
+            int opCount = items.findAll { it.operation == 
IndexOperationRequest.Operation.Index }.size()
+            Assert.assertEquals(1, nullIdCount)
+            Assert.assertEquals(1, indexCount)
+            Assert.assertEquals(1, typeCount)
+            Assert.assertEquals(1, opCount)
+        }
+
+        basicTest(failure, retry, success, evalClosure)
+    }
+
+    void basicTest(int failure, int retry, int success, Closure evalClosure) {
+        clientService.evalClosure = evalClosure
+
+        basicTest(failure, retry, success, null)
+    }
+
+    void basicTest(int failure, int retry, int success, Map<String, String> 
attr) {
+        if (attr != null) {
+            runner.enqueue(flowFileContents, attr)
+        } else {
+            runner.enqueue(flowFileContents)
+        }
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, failure)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, retry)
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, success)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+    }
+
+    @Test
+    void simpleTest() {
+        def evalParametersClosure = { Map<String, String> params ->
+            Assert.assertTrue(params.isEmpty())
+        }
+        clientService.evalParametersClosure = evalParametersClosure
+
+        basicTest(0, 0, 1)
+    }
+
+    @Test
+    void simpleTestWithDocIdAndRequestParameters() {
+        runner.setProperty("refresh", "true")
+        runner.setProperty("slices", '${slices}')
+        runner.setVariable("slices", "auto")
+        runner.assertValid()
+
+        def evalParametersClosure = { Map<String, String> params ->
+            Assert.assertEquals(2, params.size())
+            Assert.assertEquals("true", params.get("refresh"))
+            Assert.assertEquals("auto", params.get("slices"))
+        }
+
+        clientService.evalParametersClosure = evalParametersClosure
+
+        def evalClosure = { List<IndexOperationRequest> items ->
+            int idCount = items.findAll { it.id == "123" }.size()
+            int indexCount = items.findAll { it.index == "test_index" }.size()
+            int typeCount = items.findAll { it.type == "test_type" }.size()
+            int opCount = items.findAll { it.operation == 
IndexOperationRequest.Operation.Index }.size()
+            Assert.assertEquals(1, idCount)
+            Assert.assertEquals(1, indexCount)
+            Assert.assertEquals(1, typeCount)
+            Assert.assertEquals(1, opCount)
+        }
+
+        clientService.evalClosure = evalClosure
+
+        basicTest(0, 0, 1, [doc_id: "123"])
+    }
+
+    @Test
+    void simpleTestWithRequestParametersFlowFileEL() {
+        runner.setProperty("refresh", "true")
+        runner.setProperty("slices", '${slices}')
+        runner.assertValid()
+
+        def evalParametersClosure = { Map<String, String> params ->
+            Assert.assertEquals(2, params.size())
+            Assert.assertEquals("true", params.get("refresh"))
+            Assert.assertEquals("auto", params.get("slices"))
+        }
+
+        clientService.evalParametersClosure = evalParametersClosure
+
+        basicTest(0, 0, 1, [slices: "auto"])
+    }
+
+    @Test
+    void testFatalError() {
+        clientService.throwFatalError = true
+        basicTest(1, 0, 0)
+    }
+
+    @Test
+    void testRetriable() {
+        clientService.throwRetriableError = true
+        basicTest(0, 1, 0)
+    }
+
+    @Test
+    void testInvalidIndexOperation() {
+        runner.setProperty(PutElasticsearchJson.INDEX_OP, "not-valid")
+        runner.assertNotValid()
+        final AssertionError ae = Assert.assertThrows(AssertionError.class, 
runner.&run)
+        Assert.assertEquals(String.format("Processor has 1 validation 
failures:\n'%s' validated against 'not-valid' is invalid because %s must be 
Expression Language or one of %s\n",
+                PutElasticsearchJson.INDEX_OP.getName(), 
PutElasticsearchJson.INDEX_OP.getDisplayName(), 
PutElasticsearchJson.ALLOWED_INDEX_OPERATIONS),
+                ae.getMessage()
+        )
+
+        runner.setProperty(PutElasticsearchJson.INDEX_OP, "\${operation}")
+        runner.assertValid()
+        runner.enqueue(flowFileContents, [
+                "operation": "not-valid2"
+        ])
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+    }
+
+    @Test
+    void testInputRequired() {
+        runner.run()
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+    }
+
+    @Test
+    void testBatchingAndErrorRelationship() {
+        runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, "true")
+        runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "true")
+        runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "100")
+
+        clientService.response = 
IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
+
+        def values = [
+                [ id: "1", field1: 'value1', field2: '20' ],
+                [ id: "2", field1: 'value1', field2: '20' ],
+                [ id: "2", field1: 'value1', field2: '20' ],
+                [ id: "3", field1: 'value1', field2: '20abcd' ]
+        ]
+
+        for (final def val : values) {
+            runner.enqueue(prettyPrint(toJson(val)))
+        }
+        runner.assertValid()
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 3)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
1)
+        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(),
 containsString("20abcd"))
+    }
+
+    @Test
+    void testBatchingAndNoErrorOutput() {
+        runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_DOCUMENTS, 
"false")
+        runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false")
+        runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "100")
+
+        clientService.response = 
IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
+
+        def values = [
+                [ id: "1", field1: 'value1', field2: '20' ],
+                [ id: "2", field1: 'value1', field2: '20' ],
+                [ id: "2", field1: 'value1', field2: '20' ],
+                [ id: "3", field1: 'value1', field2: '20abcd' ]
+        ]
+
+        for (final def val : values) {
+            runner.enqueue(prettyPrint(toJson(val)))
+        }
+        runner.assertValid()
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+    }
+
+    @Test
+    void testInvalidInput() {
+        runner.enqueue("not-json")
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
+        runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+
+        
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE)[0].assertAttributeEquals(
+                "elasticsearch.put.error",
+                "Unrecognized token 'not': was expecting (JSON String, Number, 
Array, Object or token 'null', 'true' or 'false')\n" +
+                " at [Source: (String)\"not-json\"; line: 1, column: 4]"
+        )
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 2bbd51c..5a2c32d 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -135,6 +135,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, retry)
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
     }
 
     @Test
@@ -291,6 +292,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
 
         runner.clearTransferState()
 
@@ -344,6 +346,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
 
         runner.clearTransferState()
 
@@ -384,6 +387,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
 
         runner.clearTransferState()
 
@@ -421,6 +425,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
 
         runner.clearTransferState()
 
@@ -441,6 +446,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
     }
 
     @Test
@@ -515,6 +521,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
 
         runner.clearTransferState()
 
@@ -567,6 +574,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
     }
 
     @Test
@@ -588,12 +596,16 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 1)
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
     }
 
     @Test
     void testInputRequired() {
         runner.run()
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
     }
 
     @Test
@@ -631,6 +643,8 @@ class PutElasticsearchRecordTest {
         runner.run()
 
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
1)
 
         def errorFF = 
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]

Reply via email to