This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cfca8580c NIFI-14244 Send original Records to errors output for 
PutElasticsearchRecord errors (#9706)
9cfca8580c is described below

commit 9cfca8580c2af28fe2c1b78c9e3fb1f7fda30cf3
Author: Chris Sampson <[email protected]>
AuthorDate: Sat Mar 1 17:43:39 2025 +0000

    NIFI-14244 Send original Records to errors output for 
PutElasticsearchRecord errors (#9706)
    
    Signed-off-by: David Handermann <[email protected]>
---
 nifi-docs/src/main/asciidoc/user-guide.adoc        |   6 +-
 .../integration/AbstractElasticsearch_IT.java      |   2 +-
 .../elasticsearch/AbstractPutElasticsearch.java    |   6 +-
 .../elasticsearch/PutElasticsearchRecord.java      |  64 +++++---
 .../elasticsearch/PutElasticsearchJsonTest.java    |   4 +-
 .../elasticsearch/PutElasticsearchRecordTest.java  |  40 ++++-
 .../integration/AbstractElasticsearch_IT.java      |   6 +-
 .../integration/PutElasticsearchRecord_IT.java     | 176 +++++++++++++++++++++
 .../mock/AbstractMockElasticsearchClient.java      |  10 +-
 .../mock/MockBulkLoadClientService.java            |   2 +-
 .../integration/AbstractElasticsearchITBase.java   |  32 ++--
 .../nifi-elasticsearch-bundle/pom.xml              |   4 +-
 .../org/apache/nifi/util/MockProcessSession.java   |  21 ++-
 .../org/apache/nifi/util/MockSessionFactory.java   |   6 +-
 .../nifi/util/StandardProcessorTestRunner.java     |  13 +-
 .../main/java/org/apache/nifi/util/TestRunner.java |  12 +-
 16 files changed, 323 insertions(+), 81 deletions(-)

diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc 
b/nifi-docs/src/main/asciidoc/user-guide.adoc
index a2e5fdf872..18e75e8905 100644
--- a/nifi-docs/src/main/asciidoc/user-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/user-guide.adoc
@@ -695,9 +695,9 @@ In order for a Processor to be considered valid and able to 
run, each Relationsh
 
 ===== Automatically Retry
 Users can also configure whether or not FlowFiles routed to a given 
Relationship should be retried. If a FlowFile is routed to any Relationship 
that is configured to be retried,
-the FlowFile will be re-queued and the Processor will attempt to process it 
again. If the Processor routes the FlowFile to a retriable Relationship again 
(either the same Relationship
+the FlowFile will be re-queued and the Processor will attempt to process it 
again. If the Processor routes the FlowFile to a retryable Relationship again 
(either the same Relationship
 or another that is configured to be retried), it will be re-queued again, up 
to the number of
-times specified by the user. If the Processor routes the FlowFile to a 
retriable Relationship after the specified number of retries, the FlowFile will 
be transferred to
+times specified by the user. If the Processor routes the FlowFile to a 
retryable Relationship after the specified number of retries, the FlowFile will 
be transferred to
 the Connection(s) that include that Relationship - or auto-terminated, as 
configured.
 If the Processor routes the FlowFile to any Relationship that is not 
configured to be retried, it will be routed to that Relationship immediately.
 
@@ -705,7 +705,7 @@ For example, consider a Processor with two relationships: 
`success` and `failure
 A user configures the `failure` Relationship to retry 10 times and also be 
configured to auto-terminate. In this
 case, if an incoming FlowFile is routed to the `failure` Relationship,
 it will be retried up to 10 times. After 10 attempts, if it is routed to 
`failure` again, it will be auto-terminated. However, if at any point it is
-routed to `success`, it will immediatley be transferred to the Connection(s) 
that include the `success` Relationship and not retried any further.
+routed to `success`, it will immediately be transferred to the Connection(s) 
that include the `success` Relationship and not retried any further.
 
 ====== Number of Retry Attempts
 For relationships set to retry, this number indicates how many times a 
FlowFile will attempt to reprocess before it is routed elsewhere.
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
index 064e88ce6b..434c830de7 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -65,6 +65,6 @@ abstract class AbstractElasticsearch_IT extends 
AbstractElasticsearchITBase {
     @AfterAll
     static void afterAll() {
         tearDownTestData(TEST_INDICES);
-        stopTestcontainer();
+        stopTestContainer();
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
index fa3c1ca3c8..2956173baf 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -60,12 +60,12 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
             .description("All flowfiles that are sent to Elasticsearch without 
request failures go to this relationship.")
             .build();
 
-    static final Relationship REL_SUCCESSFUL = new Relationship.Builder()
+    public static final Relationship REL_SUCCESSFUL = new 
Relationship.Builder()
             .name("successful")
             .description("Record(s)/Flowfile(s) corresponding to Elasticsearch 
document(s) that did not result in an \"error\" (within Elasticsearch) will be 
routed here.")
             .build();
 
-    static final Relationship REL_ERRORS = new Relationship.Builder()
+    public static final Relationship REL_ERRORS = new Relationship.Builder()
             .name("errors")
             .description("Record(s)/Flowfile(s) corresponding to Elasticsearch 
document(s) that resulted in an \"error\" (within Elasticsearch) will be routed 
here.")
             .build();
@@ -96,7 +96,7 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
             .required(true)
             .build();
 
-    static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor INDEX_OP = new 
PropertyDescriptor.Builder()
             .name("put-es-record-index-op")
             .displayName("Index Operation")
             .description("The type of the operation used to index (create, 
delete, index, update, upsert)")
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 0c6aab182e..507f031511 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -118,7 +118,7 @@ import java.util.concurrent.atomic.AtomicLong;
         resource = SystemResource.MEMORY,
         description = "The Batch of Records will be stored in memory until the 
bulk operation is performed.")
 public class PutElasticsearchRecord extends AbstractPutElasticsearch {
-    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
         .name("put-es-record-reader")
         .displayName("Record Reader")
         .description("The record reader to use for reading incoming records 
from flowfiles.")
@@ -126,7 +126,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         .required(true)
         .build();
 
-    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
         .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
         .description("The number of records to send over in a single batch.")
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -151,7 +151,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor ID_RECORD_PATH = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor ID_RECORD_PATH = new 
PropertyDescriptor.Builder()
         .name("put-es-record-id-path")
         .displayName("ID Record Path")
         .description("A record path expression to retrieve the ID field for 
use with Elasticsearch. If left blank " +
@@ -161,7 +161,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor RETAIN_ID_FIELD = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor RETAIN_ID_FIELD = new 
PropertyDescriptor.Builder()
         .name("put-es-record-retain-id-field")
         .displayName("Retain ID (Record Path)")
         .description("Whether to retain the existing field used as the ID 
Record Path.")
@@ -201,7 +201,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor SCRIPT_RECORD_PATH = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor SCRIPT_RECORD_PATH = new 
PropertyDescriptor.Builder()
             .name("put-es-record-script-path")
             .displayName("Script Record Path")
             .description("A RecordPath pointing to a field in the record(s) 
that contains the script for the document update/upsert. " +
@@ -211,7 +211,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
-    static final PropertyDescriptor SCRIPTED_UPSERT_RECORD_PATH = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor SCRIPTED_UPSERT_RECORD_PATH = new 
PropertyDescriptor.Builder()
             .name("put-es-record-scripted-upsert-path")
             .displayName("Scripted Upsert Record Path")
             .description("A RecordPath pointing to a field in the record(s) 
that contains the scripted_upsert boolean flag. " +
@@ -247,7 +247,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor RESULT_RECORD_WRITER = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor RESULT_RECORD_WRITER = new 
PropertyDescriptor.Builder()
         .name("put-es-record-error-writer")
         .displayName("Result Record Writer")
         .description("The response from Elasticsearch will be examined for 
failed records " +
@@ -399,7 +399,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
 
         final List<FlowFile> resultRecords = new ArrayList<>();
 
-        final AtomicLong erroredRecords = new AtomicLong(0);
+        final AtomicLong errorRecords = new AtomicLong(0);
         final AtomicLong successfulRecords = new AtomicLong(0);
         final StopWatch stopWatch = new StopWatch(true);
         final Set<String> indices = new HashSet<>();
@@ -418,13 +418,13 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
                 originals.add(record);
 
                 if (operationList.size() == 
indexOperationParameters.getBatchSize() || !recordSet.isAnotherRecord()) {
-                    operate(operationList, originals, reader, session, input, 
indexOperationParameters.getRequestParameters(), resultRecords, erroredRecords, 
successfulRecords);
+                    operate(operationList, originals, reader, session, input, 
indexOperationParameters, resultRecords, errorRecords, successfulRecords, 
batches);
                     batches++;
                 }
             }
 
             if (!operationList.isEmpty()) {
-                operate(operationList, originals, reader, session, input, 
indexOperationParameters.getRequestParameters(), resultRecords, erroredRecords, 
successfulRecords);
+                operate(operationList, originals, reader, session, input, 
indexOperationParameters, resultRecords, errorRecords, successfulRecords, 
batches);
                 batches++;
             }
         } catch (final ElasticsearchException ese) {
@@ -452,12 +452,12 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         session.getProvenanceReporter().send(
                 input,
                 clientService.get().getTransitUrl(String.join(",", indices), 
types.isEmpty() ? null : String.join(",", types)),
-                String.format(Locale.getDefault(), "%d Elasticsearch _bulk 
operation batch(es) [%d error(s), %d success(es)]", batches, 
erroredRecords.get(), successfulRecords.get()),
+                String.format(Locale.getDefault(), "%d Elasticsearch _bulk 
operation batch(es) [%d error(s), %d success(es)]", batches, 
errorRecords.get(), successfulRecords.get()),
                 stopWatch.getDuration(TimeUnit.MILLISECONDS)
         );
 
         input = session.putAllAttributes(input, new HashMap<>() {{
-            put("elasticsearch.put.error.count", 
String.valueOf(erroredRecords.get()));
+            put("elasticsearch.put.error.count", 
String.valueOf(errorRecords.get()));
             put("elasticsearch.put.success.count", 
String.valueOf(successfulRecords.get()));
         }});
 
@@ -498,12 +498,12 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
     }
 
     private void operate(final List<IndexOperationRequest> operationList, 
final List<Record> originals, final RecordReader reader,
-                         final ProcessSession session, final FlowFile input, 
final Map<String, String> requestParameters,
-                         final List<FlowFile> resultRecords, final AtomicLong 
erroredRecords, final AtomicLong successfulRecords)
+                         final ProcessSession session, final FlowFile input, 
final IndexOperationParameters indexOperationParameters,
+                         final List<FlowFile> resultRecords, final AtomicLong 
erroredRecords, final AtomicLong successfulRecords, final int batch)
             throws IOException, SchemaNotFoundException, 
MalformedRecordException {
 
         final BulkOperation bundle = new BulkOperation(operationList, 
originals, reader.getSchema());
-        final ResponseDetails responseDetails = indexDocuments(bundle, 
session, input, requestParameters);
+        final ResponseDetails responseDetails = indexDocuments(bundle, 
session, input, indexOperationParameters, batch);
 
         successfulRecords.getAndAdd(responseDetails.successCount());
         erroredRecords.getAndAdd(responseDetails.errorCount());
@@ -522,8 +522,9 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
     }
 
     private ResponseDetails indexDocuments(final BulkOperation bundle, final 
ProcessSession session, final FlowFile input,
-                                           final Map<String, String> 
requestParameters) throws IOException, SchemaNotFoundException {
-        final IndexOperationResponse response = 
clientService.get().bulk(bundle.getOperationList(), requestParameters);
+                                           final IndexOperationParameters 
indexOperationParameters, final int batch)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final IndexOperationResponse response = 
clientService.get().bulk(bundle.getOperationList(), 
indexOperationParameters.getRequestParameters());
 
         final Map<Integer, Map<String, Object>> errors = 
findElasticsearchResponseErrors(response);
         if (!errors.isEmpty()) {
@@ -534,12 +535,23 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         final int numSuccessful = response.getItems() == null ? 0 : 
response.getItems().size() - numErrors;
         final Map<String, Output> outputs = new HashMap<>();
 
-        try {
+        try (final InputStream inStream = session.read(input);
+             final RecordReader inputReader = 
readerFactory.createRecordReader(input, inStream, getLogger())) {
+
+            // if there are errors present, skip through the input FlowFile to 
the current batch of records
+            if (numErrors > 0) {
+                for (int r = 0; r < batch * 
indexOperationParameters.getBatchSize(); r++) {
+                    inputReader.nextRecord();
+                }
+            }
+
             for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
                 final String type;
                 final Relationship relationship;
                 final Map<String, Object> error;
-                if (errors.containsKey(o)) {
+                final Record outputRecord;
+                final RecordSchema recordSchema;
+                if (numErrors > 0 && errors.containsKey(o)) {
                     relationship = REL_ERRORS;
                     error = errors.get(o);
                     if (groupBulkErrors) {
@@ -551,19 +563,27 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
                     } else {
                         type = OUTPUT_TYPE_ERROR;
                     }
+                    outputRecord = inputReader.nextRecord();
+                    recordSchema = outputRecord.getSchema();
                 } else {
                     relationship = REL_SUCCESSFUL;
                     error = null;
                     type = OUTPUT_TYPE_SUCCESS;
+                    outputRecord = bundle.getOriginalRecords().get(o);
+                    recordSchema = bundle.getSchema();
+                    // skip the associated Input Record for this successful 
Record
+                    if (numErrors > 0) {
+                        inputReader.nextRecord();
+                    }
                 }
-                final Output output = getOutputByType(outputs, type, session, 
relationship, input, bundle.getSchema());
-                output.write(bundle.getOriginalRecords().get(o), error);
+                final Output output = getOutputByType(outputs, type, session, 
relationship, input, recordSchema);
+                output.write(outputRecord, error);
             }
 
             for (final Output output : outputs.values()) {
                 output.transfer(session);
             }
-        } catch (final IOException | SchemaNotFoundException ex) {
+        } catch (final IOException | SchemaNotFoundException | 
MalformedRecordException ex) {
             getLogger().error("Unable to write error/successful records", ex);
             outputs.values().forEach(o -> {
                 try {
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
index 621c223f31..74e9f51270 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
@@ -330,8 +330,8 @@ public class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest {
     }
 
     @Test
-    void testRetriable() {
-        clientService.setThrowRetriableError(true);
+    void testRetryable() {
+        clientService.setThrowRetryableError(true);
         basicTest(0, 1, 0);
     }
 
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
index 172f24aab6..4305c2b240 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
@@ -209,12 +209,12 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
         
assertTrue(runner.getProcessContext().getProperties().keySet().stream().noneMatch(pd
 -> "put-es-record-not_found-is-error".equals(pd.getName())));
 
         
assertTrue(runner.getProcessContext().getProperties().containsKey(PutElasticsearchRecord.RESULT_RECORD_WRITER));
-        final RecordSetWriterFactory writer = runner.getControllerService(
+        final RecordSetWriterFactory migratedWriter = 
runner.getControllerService(
                 
runner.getProcessContext().getProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER.getName()).getValue(),
                 RecordSetWriterFactory.class
         );
-        assertNotNull(writer);
-        assertTrue(runner.isControllerServiceEnabled(writer));
+        assertNotNull(migratedWriter);
+        assertTrue(runner.isControllerServiceEnabled(migratedWriter));
 
         assertEquals(1, result.getPropertiesRenamed().size());
         
assertEquals(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName(), 
result.getPropertiesRenamed().get("put-es-record-not_found-is-error"));
@@ -288,6 +288,7 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
         runner.addControllerService("mockReader", mockReader);
         runner.setProperty(PutElasticsearchRecord.RECORD_READER, "mockReader");
         runner.enableControllerService(mockReader);
+        runner.setAllowRecursiveReads(true);
 
         basicTest(0, 0, 1);
     }
@@ -299,8 +300,8 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
     }
 
     @Test
-    void testRetriable() {
-        clientService.setThrowRetriableError(true);
+    void testRetryable() {
+        clientService.setThrowRetryableError(true);
         basicTest(0, 1, 0);
     }
 
@@ -724,6 +725,31 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
 
     @Test
     void testFailedRecordsOutput() throws Exception {
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true");
+        runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, 
"true");
+        runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true");
+        final int errorCount = 3;
+        final int successCount = 4;
+        testErrorRelationship(errorCount, 1, successCount);
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+        // id should remain in errors output
+        
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent().contains("\"id\":\"5\""));
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        // id should remain in successful output
+        
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().getContent().contains("\"id\":\"1\""));
+        
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
+        
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
 String.valueOf(errorCount));
+        
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
+                String.valueOf(successCount));
+    }
+
+    @Test
+    void testFailedRecordsOutputRemoveIdField() throws Exception {
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "false");
         runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, 
"true");
         runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true");
         final int errorCount = 3;
@@ -732,7 +758,11 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
 
         runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
         runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+        // id should remain in errors output
+        
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent().contains("\"id\":\"5\""));
         runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        // id should be removed from successful output
+        
assertFalse(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().getContent().contains("\"id\":\"1\""));
         
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
         
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
 String.valueOf(errorCount));
         
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java
index 23bb97998f..0c99bb3be1 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -60,7 +60,9 @@ abstract class AbstractElasticsearch_IT<P extends 
ElasticsearchRestProcessor> ex
 
         runner.setProperty(ElasticsearchRestProcessor.CLIENT_SERVICE, 
CLIENT_SERVICE_NAME);
         runner.setProperty(ElasticsearchRestProcessor.INDEX, INDEX);
-        runner.setProperty(ElasticsearchRestProcessor.TYPE, type);
+        if (!"".equals(type)) {
+            runner.setProperty(ElasticsearchRestProcessor.TYPE, type);
+        }
 
         service.refresh(null, null);
     }
@@ -73,7 +75,7 @@ abstract class AbstractElasticsearch_IT<P extends 
ElasticsearchRestProcessor> ex
     @AfterAll
     static void afterAll() {
         tearDownTestData(TEST_INDICES);
-        stopTestcontainer();
+        stopTestContainer();
     }
 
     @Test
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
index 1c9bc12e4c..a86edb4b44 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
@@ -16,11 +16,187 @@
  */
 package org.apache.nifi.processors.elasticsearch.integration;
 
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processors.elasticsearch.AbstractPutElasticsearch;
+import org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor;
 import org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class PutElasticsearchRecord_IT extends 
AbstractElasticsearch_IT<AbstractPutElasticsearch> {
     AbstractPutElasticsearch getProcessor() {
         return new PutElasticsearchRecord();
     }
+
+    @Override
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+
+        final RecordReaderFactory reader = new JsonTreeReader();
+        runner.addControllerService("reader", reader);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA);
+        runner.assertValid(reader);
+        runner.enableControllerService(reader);
+        runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader");
+
+        final JsonRecordSetWriter writer = new JsonRecordSetWriter();
+        runner.addControllerService("writer", writer);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        runner.setProperty(writer, JsonRecordSetWriter.SUPPRESS_NULLS, 
JsonRecordSetWriter.ALWAYS_SUPPRESS);
+        runner.assertValid(writer);
+        runner.enableControllerService(writer);
+        runner.setProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER, 
"writer");
+    }
+
+    @Test
+    void testErrorRecordsOutputWithoutId() {
+        final String json = """
+                [
+                    {"id": "1", "foo": false},
+                    {"id": "2", "foo": 123}
+                ]
+                """;
+
+        runner.setProperty(ElasticsearchRestProcessor.INDEX, "test-errors");
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "false");
+
+        runner.enqueue(json);
+        runner.run();
+
+        // record 1 should succeed and set the index mapping of "foo" to 
boolean, but the id field should not be in the successful output
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        final String successfulContent = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().getContent();
+        assertFalse(successfulContent.contains("\"id\":"), successfulContent);
+        assertTrue(successfulContent.contains("\"foo\":false"), 
successfulContent);
+
+        // record 2 should fail because an int cannot be indexed into a 
boolean field, the id field should remain in the error output
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+        final String errorContent = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent();
+        assertTrue(errorContent.contains("\"id\":\"2\""), errorContent);
+        assertTrue(errorContent.contains("\"foo\":123"), errorContent);
+    }
+
+    @Test
+    void testErrorRecordsOutputMultipleBatches() {
+        final String json = """
+                [
+                    {"id": "1", "foo": false},
+                    {"id": "2", "foo": 123},
+                    {"id": "3", "foo": true},
+                    {"id": "4", "foo": false},
+                    {"id": "5", "foo": 456}
+                ]
+                """;
+
+        runner.setProperty(ElasticsearchRestProcessor.INDEX, 
"test-errors-batches");
+        runner.setProperty(PutElasticsearchRecord.BATCH_SIZE, "2");
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true");
+
+        // allow multiple reads of the input FlowFile
+        runner.setAllowRecursiveReads(true);
+
+        runner.enqueue(json);
+        runner.run();
+
+        // records 1, 3 & 4 (output in 2 FlowFiles) should succeed and set the 
index mapping of "foo" to boolean, with id field retained
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 2);
+
+        final MockFlowFile successful1 = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst();
+        successful1.assertAttributeEquals("record.count", "1");
+        final String successful1Content = successful1.getContent();
+        assertTrue(successful1Content.contains("\"id\":\"1\""), 
successful1Content);
+        assertTrue(successful1Content.contains("\"foo\":false"), 
successful1Content);
+
+        final MockFlowFile successful2 = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).get(1);
+        successful2.assertAttributeEquals("record.count", "2");
+        final String successful2Content = successful2.getContent();
+        assertTrue(successful2Content.contains("\"id\":\"3\""), 
successful2Content);
+        assertTrue(successful2Content.contains("\"id\":\"4\""), 
successful2Content);
+
+        // record 2 & 5 (in 2 batches) should fail because an int cannot be 
indexed into a boolean field, the id field should remain in the error output
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 2);
+
+        final MockFlowFile error1 = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst();
+        error1.assertAttributeEquals("record.count", "1");
+        final String error1Content = error1.getContent();
+        assertTrue(error1Content.contains("\"id\":\"2\""), error1Content);
+        assertTrue(error1Content.contains("\"foo\":123"), error1Content);
+
+        final MockFlowFile error2 = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(1);
+        error2.assertAttributeEquals("record.count", "1");
+        final String error2Content = error2.getContent();
+        assertTrue(error2Content.contains("\"id\":\"5\""), error2Content);
+        assertTrue(error2Content.contains("\"foo\":456"), error2Content);
+    }
+
+    @Test
+    void testUpdateError() {
+        final String json = """
+                {"id": "123", "foo": "bar"}
+                """;
+
+        runner.setProperty(AbstractPutElasticsearch.INDEX_OP, "update");
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+
+        runner.enqueue(json);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+        final String errorContent = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent();
+        assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
+        assertTrue(errorContent.contains("\"foo\":\"bar\""), errorContent);
+    }
+
+    @Test
+    void testUpdateScriptError() {
+        final String json = """
+                {"id": "123", "script": {"source": "ctx._source.counter += 
params.count"}}
+                """;
+
+        runner.setProperty(AbstractPutElasticsearch.INDEX_OP, "update");
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchRecord.SCRIPT_RECORD_PATH, 
"/script");
+
+        runner.enqueue(json);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+        final String errorContent = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent();
+        assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
+        assertTrue(errorContent.contains("\"script\":{"), errorContent);
+    }
+
+    @Test
+    void testScriptedUpsertError() {
+        final String json = """
+                {"id": "123", "script": {"source": "ctx._source.counter += 
params.count"}, "upsert": true}
+                """;
+
+        runner.setProperty(AbstractPutElasticsearch.INDEX_OP, "upsert");
+        runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchRecord.SCRIPT_RECORD_PATH, 
"/script");
+        runner.setProperty(PutElasticsearchRecord.SCRIPTED_UPSERT_RECORD_PATH, 
"/upsert");
+
+        runner.enqueue(json);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+        final String errorContent = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent();
+        assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
+        assertTrue(errorContent.contains("\"script\":{"), errorContent);
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java
index 9f3d6fd566..1d641912de 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java
@@ -121,12 +121,12 @@ public class AbstractMockElasticsearchClient extends 
AbstractControllerService i
         return String.format("http://localhost:9200/%s/%s";, index, 
StringUtils.isNotBlank(type) ? type : "");
     }
 
-    public boolean isThrowRetriableError() {
-        return throwRetriableError;
+    public boolean isThrowRetryableError() {
+        return throwRetryableError;
     }
 
-    public void setThrowRetriableError(final boolean throwRetriableError) {
-        this.throwRetriableError = throwRetriableError;
+    public void setThrowRetryableError(final boolean throwRetryableError) {
+        this.throwRetryableError = throwRetryableError;
     }
 
     public boolean isThrowFatalError() {
@@ -137,6 +137,6 @@ public class AbstractMockElasticsearchClient extends 
AbstractControllerService i
         this.throwFatalError = throwFatalError;
     }
 
-    private boolean throwRetriableError;
+    private boolean throwRetryableError;
     private boolean throwFatalError;
 }
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java
index 55d89e4600..a2a4af6aa1 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java
@@ -30,7 +30,7 @@ public class MockBulkLoadClientService extends 
AbstractMockElasticsearchClient {
 
     @Override
     public IndexOperationResponse bulk(final List<IndexOperationRequest> 
items, final Map<String, String> requestParameters) {
-        if (isThrowRetriableError()) {
+        if (isThrowRetryableError()) {
             throw new MockElasticsearchException(true, false);
         } else if (isThrowFatalError()) {
             throw new MockElasticsearchException(false, false);
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 163b8fad99..c7e811496d 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -52,7 +52,7 @@ import static org.apache.http.auth.AuthScope.ANY;
 public abstract class AbstractElasticsearchITBase {
     // default Elasticsearch version should (ideally) match that in the 
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
     protected static final DockerImageName IMAGE = DockerImageName
-            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.17.0"));
+            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.17.2"));
     protected static final String ELASTIC_USER_PASSWORD = 
System.getProperty("elasticsearch.elastic_user.password", 
RandomStringUtils.randomAlphanumeric(10, 20));
     private static final int PORT = 9200;
     protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = 
new ElasticsearchContainer(IMAGE)
@@ -79,7 +79,7 @@ public abstract class AbstractElasticsearchITBase {
 
     protected static final boolean ENABLE_TEST_CONTAINERS = 
"true".equalsIgnoreCase(System.getProperty("elasticsearch.testcontainers.enabled"));
     protected static String elasticsearchHost;
-    protected static void startTestcontainer() {
+    protected static void startTestContainer() {
         if (ENABLE_TEST_CONTAINERS) {
             if (getElasticMajorVersion() == 6) {
                 // disable system call filter check to allow Elasticsearch 6 
to run on aarch64 machines (e.g. Mac M1/2)
@@ -99,7 +99,7 @@ public abstract class AbstractElasticsearchITBase {
 
     static RestClient testDataManagementClient;
 
-    protected static void stopTestcontainer() {
+    protected static void stopTestContainer() {
         if (ENABLE_TEST_CONTAINERS) {
             ELASTICSEARCH_CONTAINER.stop();
         }
@@ -107,7 +107,7 @@ public abstract class AbstractElasticsearchITBase {
 
     @BeforeAll
     static void beforeAll() throws IOException {
-        startTestcontainer();
+        startTestContainer();
         type = getElasticMajorVersion() == 6 ? "_doc" : "";
         System.out.printf("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nIMAGE: 
%s:%s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
                 type, IMAGE.getRepository(), IMAGE.getVersionPart());
@@ -201,23 +201,13 @@ public abstract class AbstractElasticsearchITBase {
         return actions;
     }
 
-    private static final class SetupAction {
-        private final String verb;
-        private final String path;
-        private final String json;
-
-        public SetupAction(final String verb, final String path, final String 
json) {
-            this.verb = verb;
-            this.path = path;
-            this.json = json;
-        }
-
+    private record SetupAction(String verb, String path, String json) {
         @Override
-        public String toString() {
-            return "SetupAction{" +
-                    "verb='" + verb + '\'' +
-                    ", path='" + path + '\'' +
-                    '}';
+            public String toString() {
+                return "SetupAction{" +
+                        "verb='" + verb + '\'' +
+                        ", path='" + path + '\'' +
+                        '}';
+            }
         }
-    }
 }
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
index dd7f1c4153..1e78810b06 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -33,7 +33,7 @@ language governing permissions and limitations under the 
License. -->
     </modules>
 
     <properties>
-        <elasticsearch.client.version>8.17.1</elasticsearch.client.version>
+        <elasticsearch.client.version>8.17.2</elasticsearch.client.version>
     </properties>
 
     <dependencyManagement>
@@ -114,7 +114,7 @@ language governing permissions and limitations under the 
License. -->
         <profile>
             <id>elasticsearch7</id>
             <properties>
-                
<elasticsearch_docker_image>7.17.26</elasticsearch_docker_image>
+                
<elasticsearch_docker_image>7.17.27</elasticsearch_docker_image>
             </properties>
         </profile>
     </profiles>
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index eae7706786..f73971f4a8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -87,9 +87,10 @@ public class MockProcessSession implements ProcessSession {
     private final Map<FlowFile, OutputStream> openOutputStreams = new 
HashMap<>();
     private final StateManager stateManager;
     private final boolean allowSynchronousCommits;
+    private final boolean allowRecursiveReads;
 
     private boolean committed = false;
-    private boolean rolledback = false;
+    private boolean rolledBack = false;
     private final Set<Long> removedFlowFiles = new HashSet<>();
 
     private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
@@ -108,6 +109,11 @@ public class MockProcessSession implements ProcessSession {
 
     public MockProcessSession(final SharedSessionState sharedState, final 
Processor processor, final boolean enforceStreamsClosed, final StateManager 
stateManager,
                               final boolean allowSynchronousCommits) {
+        this(sharedState, processor, enforceStreamsClosed, stateManager, 
allowSynchronousCommits, false);
+    }
+
+    public MockProcessSession(final SharedSessionState sharedState, final 
Processor processor, final boolean enforceStreamsClosed, final StateManager 
stateManager,
+                              final boolean allowSynchronousCommits, final 
boolean allowRecursiveReads) {
         this.processor = processor;
         this.enforceStreamsClosed = enforceStreamsClosed;
         this.sharedState = sharedState;
@@ -115,6 +121,7 @@ public class MockProcessSession implements ProcessSession {
         this.provenanceReporter = new MockProvenanceReporter(this, 
sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
         this.stateManager = stateManager;
         this.allowSynchronousCommits = allowSynchronousCommits;
+        this.allowRecursiveReads = allowRecursiveReads;
     }
 
     @Override
@@ -337,7 +344,7 @@ public class MockProcessSession implements ProcessSession {
      * session
      */
     public void clearRollback() {
-        rolledback = false;
+        rolledBack = false;
     }
 
     @Override
@@ -822,7 +829,7 @@ public class MockProcessSession implements ProcessSession {
             }
         }
 
-        rolledback = true;
+        rolledBack = true;
         beingProcessed.clear();
         currentVersions.clear();
         originalVersions.clear();
@@ -1092,7 +1099,7 @@ public class MockProcessSession implements ProcessSession 
{
 
     private List<FlowFile> validateState(final Collection<FlowFile> flowFiles) 
{
         return flowFiles.stream()
-            .map(ff -> validateState(ff))
+            .map(this::validateState)
             .collect(Collectors.toList());
     }
 
@@ -1104,7 +1111,7 @@ public class MockProcessSession implements ProcessSession 
{
             throw new FlowFileHandlingException(flowFile + " is not known in 
this session");
         }
 
-        if (readRecursionSet.containsKey(flowFile)) {
+        if (readRecursionSet.containsKey(flowFile) && !allowRecursiveReads) {
             throw new IllegalStateException(flowFile + " already in use for an 
active callback or InputStream created by ProcessSession.read(FlowFile) has not 
been closed");
         }
 
@@ -1234,14 +1241,14 @@ public class MockProcessSession implements 
ProcessSession {
      * Assert that {@link #rollback()} has been called
      */
     public void assertRolledBack() {
-        Assertions.assertTrue(rolledback, "Session was not rolled back");
+        Assertions.assertTrue(rolledBack, "Session was not rolled back");
     }
 
     /**
      * Assert that {@link #rollback()} has not been called
      */
     public void assertNotRolledBack() {
-        Assertions.assertFalse(rolledback, "Session was rolled back");
+        Assertions.assertFalse(rolledBack, "Session was rolled back");
     }
 
     /**
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
index a5d9ff9f40..4fc53ec711 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -33,19 +33,21 @@ public class MockSessionFactory implements 
ProcessSessionFactory {
     private final boolean enforceReadStreamsClosed;
     private final StateManager stateManager;
     private final boolean allowSynchronousSessionCommits;
+    private final boolean allowRecursiveReads;
 
     MockSessionFactory(final SharedSessionState sharedState, final Processor 
processor, final boolean enforceReadStreamsClosed, final StateManager 
stateManager,
-                       final boolean allowSynchronousSessionCommits) {
+                       final boolean allowSynchronousSessionCommits, final 
boolean allowRecursiveReads) {
         this.sharedState = sharedState;
         this.processor = processor;
         this.enforceReadStreamsClosed = enforceReadStreamsClosed;
         this.stateManager = stateManager;
         this.allowSynchronousSessionCommits = allowSynchronousSessionCommits;
+        this.allowRecursiveReads = allowRecursiveReads;
     }
 
     @Override
     public ProcessSession createSession() {
-        final MockProcessSession session = new MockProcessSession(sharedState, 
processor, enforceReadStreamsClosed, stateManager, 
allowSynchronousSessionCommits);
+        final MockProcessSession session = new MockProcessSession(sharedState, 
processor, enforceReadStreamsClosed, stateManager, 
allowSynchronousSessionCommits, allowRecursiveReads);
         createdSessions.add(session);
         return session;
     }
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 292c7a9e37..68d1e90e20 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -95,6 +95,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     private int numThreads = 1;
     private MockSessionFactory sessionFactory;
     private boolean allowSynchronousSessionCommits = false;
+    private boolean allowRecursiveReads = false;
     private long runSchedule = 0;
     private final AtomicInteger invocations = new AtomicInteger(0);
 
@@ -128,7 +129,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         this.sharedState = new SharedSessionState(processor, idGenerator);
         this.flowFileQueue = sharedState.getFlowFileQueue();
         this.processorStateManager = new MockStateManager(processor);
-        this.sessionFactory = new MockSessionFactory(sharedState, processor, 
enforceReadStreamsClosed, processorStateManager, 
allowSynchronousSessionCommits);
+        this.sessionFactory = new MockSessionFactory(sharedState, processor, 
enforceReadStreamsClosed, processorStateManager, 
allowSynchronousSessionCommits, allowRecursiveReads);
 
         this.context = new MockProcessContext(processor, processorName, 
processorStateManager, environmentVariables);
         this.kerberosContext = kerberosContext;
@@ -149,7 +150,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     @Override
     public void enforceReadStreamsClosed(final boolean enforce) {
         enforceReadStreamsClosed = enforce;
-        this.sessionFactory = new MockSessionFactory(sharedState, processor, 
enforceReadStreamsClosed, processorStateManager, 
allowSynchronousSessionCommits);
+        this.sessionFactory = new MockSessionFactory(sharedState, processor, 
enforceReadStreamsClosed, processorStateManager, 
allowSynchronousSessionCommits, allowRecursiveReads);
     }
 
     @Override
@@ -161,7 +162,13 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     @Override
     public void setAllowSynchronousSessionCommits(final boolean 
allowSynchronousSessionCommits) {
         this.allowSynchronousSessionCommits = allowSynchronousSessionCommits;
-        this.sessionFactory = new MockSessionFactory(sharedState, processor, 
enforceReadStreamsClosed, processorStateManager, 
allowSynchronousSessionCommits);
+        this.sessionFactory = new MockSessionFactory(sharedState, processor, 
enforceReadStreamsClosed, processorStateManager, 
allowSynchronousSessionCommits, allowRecursiveReads);
+    }
+
+    @Override
+    public void setAllowRecursiveReads(final boolean allowRecursiveReads) {
+        this.allowRecursiveReads = allowRecursiveReads;
+        this.sessionFactory = new MockSessionFactory(sharedState, processor, 
enforceReadStreamsClosed, processorStateManager, 
allowSynchronousSessionCommits, allowRecursiveReads);
     }
 
     @Override
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 57f39f23e7..0f5c1ecf73 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -891,13 +891,21 @@ public interface TestRunner {
     void setValidateExpressionUsage(boolean validate);
 
     /**
-     * Specifies whether or not the TestRunner will allow 
ProcessSession.commit() to be called.
+     * Specifies whether the TestRunner will allow ProcessSession.commit() to 
be called.
      * By default, the value is <code>false</code>, meaning that any call to 
ProcessSession.commit() will throw
      * an Exception. See JavaDocs for {@link ProcessSession#commit()} for more 
information
-     * @param allow whethr or not to allow asynchronous session commits (i.e., 
calls to ProcessSession.commit())
+     * @param allow whether to allow asynchronous session commits (i.e., calls 
to ProcessSession.commit())
      */
     void setAllowSynchronousSessionCommits(boolean allow);
 
+    /**
+     * Specifies whether the TestRunner will allow ProcessSession.read() 
multiple times for the same FlowFile while an InputStream is already open.
+     * By default, the value is <code>false</code>, meaning that any call to 
ProcessSession.read() for a FlowFile already being read will throw
+     * an Exception. See JavaDocs for {@link ProcessSession#read(FlowFile)} 
for more information
+     * @param allow whether to allow recursive reads of a FlowFile (i.e., 
calls to ProcessSession.read())
+     */
+    void setAllowRecursiveReads(boolean allow);
+
     /**
      * Removes the {@link PropertyDescriptor} from the {@link ProcessContext},
      * effectively setting its value to null, or the property's default value, 
if it has one.

Reply via email to