This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9cfca8580c NIFI-14244 Send original Records to errors output for
PutElasticsearchRecord errors (#9706)
9cfca8580c is described below
commit 9cfca8580c2af28fe2c1b78c9e3fb1f7fda30cf3
Author: Chris Sampson <[email protected]>
AuthorDate: Sat Mar 1 17:43:39 2025 +0000
NIFI-14244 Send original Records to errors output for
PutElasticsearchRecord errors (#9706)
Signed-off-by: David Handermann <[email protected]>
---
nifi-docs/src/main/asciidoc/user-guide.adoc | 6 +-
.../integration/AbstractElasticsearch_IT.java | 2 +-
.../elasticsearch/AbstractPutElasticsearch.java | 6 +-
.../elasticsearch/PutElasticsearchRecord.java | 64 +++++---
.../elasticsearch/PutElasticsearchJsonTest.java | 4 +-
.../elasticsearch/PutElasticsearchRecordTest.java | 40 ++++-
.../integration/AbstractElasticsearch_IT.java | 6 +-
.../integration/PutElasticsearchRecord_IT.java | 176 +++++++++++++++++++++
.../mock/AbstractMockElasticsearchClient.java | 10 +-
.../mock/MockBulkLoadClientService.java | 2 +-
.../integration/AbstractElasticsearchITBase.java | 32 ++--
.../nifi-elasticsearch-bundle/pom.xml | 4 +-
.../org/apache/nifi/util/MockProcessSession.java | 21 ++-
.../org/apache/nifi/util/MockSessionFactory.java | 6 +-
.../nifi/util/StandardProcessorTestRunner.java | 13 +-
.../main/java/org/apache/nifi/util/TestRunner.java | 12 +-
16 files changed, 323 insertions(+), 81 deletions(-)
diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc
b/nifi-docs/src/main/asciidoc/user-guide.adoc
index a2e5fdf872..18e75e8905 100644
--- a/nifi-docs/src/main/asciidoc/user-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/user-guide.adoc
@@ -695,9 +695,9 @@ In order for a Processor to be considered valid and able to
run, each Relationsh
===== Automatically Retry
Users can also configure whether or not FlowFiles routed to a given
Relationship should be retried. If a FlowFile is routed to any Relationship
that is configured to be retried,
-the FlowFile will be re-queued and the Processor will attempt to process it
again. If the Processor routes the FlowFile to a retriable Relationship again
(either the same Relationship
+the FlowFile will be re-queued and the Processor will attempt to process it
again. If the Processor routes the FlowFile to a retryable Relationship again
(either the same Relationship
or another that is configured to be retried), it will be re-queued again, up
to the number of
-times specified by the user. If the Processor routes the FlowFile to a
retriable Relationship after the specified number of retries, the FlowFile will
be transferred to
+times specified by the user. If the Processor routes the FlowFile to a
retryable Relationship after the specified number of retries, the FlowFile will
be transferred to
the Connection(s) that include that Relationship - or auto-terminated, as
configured.
If the Processor routes the FlowFile to any Relationship that is not
configured to be retried, it will be routed to that Relationship immediately.
@@ -705,7 +705,7 @@ For example, consider a Processor with two relationships:
`success` and `failure
A user configures the `failure` Relationship to retry 10 times and also be
configured to auto-terminate. In this
case, if an incoming FlowFile is routed to the `failure` Relationship,
it will be retried up to 10 times. After 10 attempts, if it is routed to
`failure` again, it will be auto-terminated. However, if at any point it is
-routed to `success`, it will immediatley be transferred to the Connection(s)
that include the `success` Relationship and not retried any further.
+routed to `success`, it will immediately be transferred to the Connection(s)
that include the `success` Relationship and not retried any further.
====== Number of Retry Attempts
For relationships set to retry, this number indicates how many times a
FlowFile will attempt to reprocess before it is routed elsewhere.
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
index 064e88ce6b..434c830de7 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -65,6 +65,6 @@ abstract class AbstractElasticsearch_IT extends
AbstractElasticsearchITBase {
@AfterAll
static void afterAll() {
tearDownTestData(TEST_INDICES);
- stopTestcontainer();
+ stopTestContainer();
}
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
index fa3c1ca3c8..2956173baf 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -60,12 +60,12 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
.description("All flowfiles that are sent to Elasticsearch without
request failures go to this relationship.")
.build();
- static final Relationship REL_SUCCESSFUL = new Relationship.Builder()
+ public static final Relationship REL_SUCCESSFUL = new
Relationship.Builder()
.name("successful")
.description("Record(s)/Flowfile(s) corresponding to Elasticsearch
document(s) that did not result in an \"error\" (within Elasticsearch) will be
routed here.")
.build();
- static final Relationship REL_ERRORS = new Relationship.Builder()
+ public static final Relationship REL_ERRORS = new Relationship.Builder()
.name("errors")
.description("Record(s)/Flowfile(s) corresponding to Elasticsearch
document(s) that resulted in an \"error\" (within Elasticsearch) will be routed
here.")
.build();
@@ -96,7 +96,7 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
.required(true)
.build();
- static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor INDEX_OP = new
PropertyDescriptor.Builder()
.name("put-es-record-index-op")
.displayName("Index Operation")
.description("The type of the operation used to index (create,
delete, index, update, upsert)")
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 0c6aab182e..507f031511 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -118,7 +118,7 @@ import java.util.concurrent.atomic.AtomicLong;
resource = SystemResource.MEMORY,
description = "The Batch of Records will be stored in memory until the
bulk operation is performed.")
public class PutElasticsearchRecord extends AbstractPutElasticsearch {
- static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
.name("put-es-record-reader")
.displayName("Record Reader")
.description("The record reader to use for reading incoming records
from flowfiles.")
@@ -126,7 +126,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.required(true)
.build();
- static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
.description("The number of records to send over in a single batch.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -151,7 +151,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor ID_RECORD_PATH = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor ID_RECORD_PATH = new
PropertyDescriptor.Builder()
.name("put-es-record-id-path")
.displayName("ID Record Path")
.description("A record path expression to retrieve the ID field for
use with Elasticsearch. If left blank " +
@@ -161,7 +161,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor RETAIN_ID_FIELD = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor RETAIN_ID_FIELD = new
PropertyDescriptor.Builder()
.name("put-es-record-retain-id-field")
.displayName("Retain ID (Record Path)")
.description("Whether to retain the existing field used as the ID
Record Path.")
@@ -201,7 +201,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor SCRIPT_RECORD_PATH = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor SCRIPT_RECORD_PATH = new
PropertyDescriptor.Builder()
.name("put-es-record-script-path")
.displayName("Script Record Path")
.description("A RecordPath pointing to a field in the record(s)
that contains the script for the document update/upsert. " +
@@ -211,7 +211,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor SCRIPTED_UPSERT_RECORD_PATH = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor SCRIPTED_UPSERT_RECORD_PATH = new
PropertyDescriptor.Builder()
.name("put-es-record-scripted-upsert-path")
.displayName("Scripted Upsert Record Path")
.description("A RecordPath pointing to a field in the record(s)
that contains the scripted_upsert boolean flag. " +
@@ -247,7 +247,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor RESULT_RECORD_WRITER = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor RESULT_RECORD_WRITER = new
PropertyDescriptor.Builder()
.name("put-es-record-error-writer")
.displayName("Result Record Writer")
.description("The response from Elasticsearch will be examined for
failed records " +
@@ -399,7 +399,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
final List<FlowFile> resultRecords = new ArrayList<>();
- final AtomicLong erroredRecords = new AtomicLong(0);
+ final AtomicLong errorRecords = new AtomicLong(0);
final AtomicLong successfulRecords = new AtomicLong(0);
final StopWatch stopWatch = new StopWatch(true);
final Set<String> indices = new HashSet<>();
@@ -418,13 +418,13 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
originals.add(record);
if (operationList.size() ==
indexOperationParameters.getBatchSize() || !recordSet.isAnotherRecord()) {
- operate(operationList, originals, reader, session, input,
indexOperationParameters.getRequestParameters(), resultRecords, erroredRecords,
successfulRecords);
+ operate(operationList, originals, reader, session, input,
indexOperationParameters, resultRecords, errorRecords, successfulRecords,
batches);
batches++;
}
}
if (!operationList.isEmpty()) {
- operate(operationList, originals, reader, session, input,
indexOperationParameters.getRequestParameters(), resultRecords, erroredRecords,
successfulRecords);
+ operate(operationList, originals, reader, session, input,
indexOperationParameters, resultRecords, errorRecords, successfulRecords,
batches);
batches++;
}
} catch (final ElasticsearchException ese) {
@@ -452,12 +452,12 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
session.getProvenanceReporter().send(
input,
clientService.get().getTransitUrl(String.join(",", indices),
types.isEmpty() ? null : String.join(",", types)),
- String.format(Locale.getDefault(), "%d Elasticsearch _bulk
operation batch(es) [%d error(s), %d success(es)]", batches,
erroredRecords.get(), successfulRecords.get()),
+ String.format(Locale.getDefault(), "%d Elasticsearch _bulk
operation batch(es) [%d error(s), %d success(es)]", batches,
errorRecords.get(), successfulRecords.get()),
stopWatch.getDuration(TimeUnit.MILLISECONDS)
);
input = session.putAllAttributes(input, new HashMap<>() {{
- put("elasticsearch.put.error.count",
String.valueOf(erroredRecords.get()));
+ put("elasticsearch.put.error.count",
String.valueOf(errorRecords.get()));
put("elasticsearch.put.success.count",
String.valueOf(successfulRecords.get()));
}});
@@ -498,12 +498,12 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
}
private void operate(final List<IndexOperationRequest> operationList,
final List<Record> originals, final RecordReader reader,
- final ProcessSession session, final FlowFile input,
final Map<String, String> requestParameters,
- final List<FlowFile> resultRecords, final AtomicLong
erroredRecords, final AtomicLong successfulRecords)
+ final ProcessSession session, final FlowFile input,
final IndexOperationParameters indexOperationParameters,
+ final List<FlowFile> resultRecords, final AtomicLong
erroredRecords, final AtomicLong successfulRecords, final int batch)
throws IOException, SchemaNotFoundException,
MalformedRecordException {
final BulkOperation bundle = new BulkOperation(operationList,
originals, reader.getSchema());
- final ResponseDetails responseDetails = indexDocuments(bundle,
session, input, requestParameters);
+ final ResponseDetails responseDetails = indexDocuments(bundle,
session, input, indexOperationParameters, batch);
successfulRecords.getAndAdd(responseDetails.successCount());
erroredRecords.getAndAdd(responseDetails.errorCount());
@@ -522,8 +522,9 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
}
private ResponseDetails indexDocuments(final BulkOperation bundle, final
ProcessSession session, final FlowFile input,
- final Map<String, String>
requestParameters) throws IOException, SchemaNotFoundException {
- final IndexOperationResponse response =
clientService.get().bulk(bundle.getOperationList(), requestParameters);
+ final IndexOperationParameters
indexOperationParameters, final int batch)
+ throws IOException, SchemaNotFoundException,
MalformedRecordException {
+ final IndexOperationResponse response =
clientService.get().bulk(bundle.getOperationList(),
indexOperationParameters.getRequestParameters());
final Map<Integer, Map<String, Object>> errors =
findElasticsearchResponseErrors(response);
if (!errors.isEmpty()) {
@@ -534,12 +535,23 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
final int numSuccessful = response.getItems() == null ? 0 :
response.getItems().size() - numErrors;
final Map<String, Output> outputs = new HashMap<>();
- try {
+ try (final InputStream inStream = session.read(input);
+ final RecordReader inputReader =
readerFactory.createRecordReader(input, inStream, getLogger())) {
+
+ // if there are errors present, skip through the input FlowFile to
the current batch of records
+ if (numErrors > 0) {
+ for (int r = 0; r < batch *
indexOperationParameters.getBatchSize(); r++) {
+ inputReader.nextRecord();
+ }
+ }
+
for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
final String type;
final Relationship relationship;
final Map<String, Object> error;
- if (errors.containsKey(o)) {
+ final Record outputRecord;
+ final RecordSchema recordSchema;
+ if (numErrors > 0 && errors.containsKey(o)) {
relationship = REL_ERRORS;
error = errors.get(o);
if (groupBulkErrors) {
@@ -551,19 +563,27 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
} else {
type = OUTPUT_TYPE_ERROR;
}
+ outputRecord = inputReader.nextRecord();
+ recordSchema = outputRecord.getSchema();
} else {
relationship = REL_SUCCESSFUL;
error = null;
type = OUTPUT_TYPE_SUCCESS;
+ outputRecord = bundle.getOriginalRecords().get(o);
+ recordSchema = bundle.getSchema();
+ // skip the associated Input Record for this successful
Record
+ if (numErrors > 0) {
+ inputReader.nextRecord();
+ }
}
- final Output output = getOutputByType(outputs, type, session,
relationship, input, bundle.getSchema());
- output.write(bundle.getOriginalRecords().get(o), error);
+ final Output output = getOutputByType(outputs, type, session,
relationship, input, recordSchema);
+ output.write(outputRecord, error);
}
for (final Output output : outputs.values()) {
output.transfer(session);
}
- } catch (final IOException | SchemaNotFoundException ex) {
+ } catch (final IOException | SchemaNotFoundException |
MalformedRecordException ex) {
getLogger().error("Unable to write error/successful records", ex);
outputs.values().forEach(o -> {
try {
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
index 621c223f31..74e9f51270 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
@@ -330,8 +330,8 @@ public class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest {
}
@Test
- void testRetriable() {
- clientService.setThrowRetriableError(true);
+ void testRetryable() {
+ clientService.setThrowRetryableError(true);
basicTest(0, 1, 0);
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
index 172f24aab6..4305c2b240 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
@@ -209,12 +209,12 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
assertTrue(runner.getProcessContext().getProperties().keySet().stream().noneMatch(pd
-> "put-es-record-not_found-is-error".equals(pd.getName())));
assertTrue(runner.getProcessContext().getProperties().containsKey(PutElasticsearchRecord.RESULT_RECORD_WRITER));
- final RecordSetWriterFactory writer = runner.getControllerService(
+ final RecordSetWriterFactory migratedWriter =
runner.getControllerService(
runner.getProcessContext().getProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER.getName()).getValue(),
RecordSetWriterFactory.class
);
- assertNotNull(writer);
- assertTrue(runner.isControllerServiceEnabled(writer));
+ assertNotNull(migratedWriter);
+ assertTrue(runner.isControllerServiceEnabled(migratedWriter));
assertEquals(1, result.getPropertiesRenamed().size());
assertEquals(AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName(),
result.getPropertiesRenamed().get("put-es-record-not_found-is-error"));
@@ -288,6 +288,7 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
runner.addControllerService("mockReader", mockReader);
runner.setProperty(PutElasticsearchRecord.RECORD_READER, "mockReader");
runner.enableControllerService(mockReader);
+ runner.setAllowRecursiveReads(true);
basicTest(0, 0, 1);
}
@@ -299,8 +300,8 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
}
@Test
- void testRetriable() {
- clientService.setThrowRetriableError(true);
+ void testRetryable() {
+ clientService.setThrowRetryableError(true);
basicTest(0, 1, 0);
}
@@ -724,6 +725,31 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
@Test
void testFailedRecordsOutput() throws Exception {
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true");
+ runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL,
"true");
+ runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true");
+ final int errorCount = 3;
+ final int successCount = 4;
+ testErrorRelationship(errorCount, 1, successCount);
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+ // id should remain in errors output
+
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent().contains("\"id\":\"5\""));
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ // id should remain in successful output
+
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().getContent().contains("\"id\":\"1\""));
+
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
+
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
String.valueOf(errorCount));
+
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
+ String.valueOf(successCount));
+ }
+
+ @Test
+ void testFailedRecordsOutputRemoveIdField() throws Exception {
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "false");
runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL,
"true");
runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true");
final int errorCount = 3;
@@ -732,7 +758,11 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+ // id should remain in errors output
+
assertTrue(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent().contains("\"id\":\"5\""));
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ // id should be removed from successful output
+
assertFalse(runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().getContent().contains("\"id\":\"1\""));
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
String.valueOf(errorCount));
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT,
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java
index 23bb97998f..0c99bb3be1 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java
@@ -60,7 +60,9 @@ abstract class AbstractElasticsearch_IT<P extends
ElasticsearchRestProcessor> ex
runner.setProperty(ElasticsearchRestProcessor.CLIENT_SERVICE,
CLIENT_SERVICE_NAME);
runner.setProperty(ElasticsearchRestProcessor.INDEX, INDEX);
- runner.setProperty(ElasticsearchRestProcessor.TYPE, type);
+ if (!"".equals(type)) {
+ runner.setProperty(ElasticsearchRestProcessor.TYPE, type);
+ }
service.refresh(null, null);
}
@@ -73,7 +75,7 @@ abstract class AbstractElasticsearch_IT<P extends
ElasticsearchRestProcessor> ex
@AfterAll
static void afterAll() {
tearDownTestData(TEST_INDICES);
- stopTestcontainer();
+ stopTestContainer();
}
@Test
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
index 1c9bc12e4c..a86edb4b44 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java
@@ -16,11 +16,187 @@
*/
package org.apache.nifi.processors.elasticsearch.integration;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.elasticsearch.AbstractPutElasticsearch;
+import org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor;
import org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class PutElasticsearchRecord_IT extends
AbstractElasticsearch_IT<AbstractPutElasticsearch> {
AbstractPutElasticsearch getProcessor() {
return new PutElasticsearchRecord();
}
+
+ @Override
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+
+ final RecordReaderFactory reader = new JsonTreeReader();
+ runner.addControllerService("reader", reader);
+ runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaInferenceUtil.INFER_SCHEMA);
+ runner.assertValid(reader);
+ runner.enableControllerService(reader);
+ runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader");
+
+ final JsonRecordSetWriter writer = new JsonRecordSetWriter();
+ runner.addControllerService("writer", writer);
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+ runner.setProperty(writer, JsonRecordSetWriter.SUPPRESS_NULLS,
JsonRecordSetWriter.ALWAYS_SUPPRESS);
+ runner.assertValid(writer);
+ runner.enableControllerService(writer);
+ runner.setProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER,
"writer");
+ }
+
+ @Test
+ void testErrorRecordsOutputWithoutId() {
+ final String json = """
+ [
+ {"id": "1", "foo": false},
+ {"id": "2", "foo": 123}
+ ]
+ """;
+
+ runner.setProperty(ElasticsearchRestProcessor.INDEX, "test-errors");
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "false");
+
+ runner.enqueue(json);
+ runner.run();
+
+ // record 1 should succeed and set the index mapping of "foo" to
boolean, but the id field should not be in the successful output
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ final String successfulContent =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst().getContent();
+ assertFalse(successfulContent.contains("\"id\":"), successfulContent);
+ assertTrue(successfulContent.contains("\"foo\":false"),
successfulContent);
+
+ // record 2 should fail because an int cannot be indexed into a
boolean field, the id field should remain in the error output
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+ final String errorContent =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent();
+ assertTrue(errorContent.contains("\"id\":\"2\""), errorContent);
+ assertTrue(errorContent.contains("\"foo\":123"), errorContent);
+ }
+
+ @Test
+ void testErrorRecordsOutputMultipleBatches() {
+ final String json = """
+ [
+ {"id": "1", "foo": false},
+ {"id": "2", "foo": 123},
+ {"id": "3", "foo": true},
+ {"id": "4", "foo": false},
+ {"id": "5", "foo": 456}
+ ]
+ """;
+
+ runner.setProperty(ElasticsearchRestProcessor.INDEX,
"test-errors-batches");
+ runner.setProperty(PutElasticsearchRecord.BATCH_SIZE, "2");
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true");
+
+ // allow multiple reads of the input FlowFile
+ runner.setAllowRecursiveReads(true);
+
+ runner.enqueue(json);
+ runner.run();
+
+ // records 1, 3 & 4 (output in 2 FlowFiles) should succeed and set the
index mapping of "foo" to boolean, with id field retained
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 2);
+
+ final MockFlowFile successful1 =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst();
+ successful1.assertAttributeEquals("record.count", "1");
+ final String successful1Content = successful1.getContent();
+ assertTrue(successful1Content.contains("\"id\":\"1\""),
successful1Content);
+ assertTrue(successful1Content.contains("\"foo\":false"),
successful1Content);
+
+ final MockFlowFile successful2 =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).get(1);
+ successful2.assertAttributeEquals("record.count", "2");
+ final String successful2Content = successful2.getContent();
+ assertTrue(successful2Content.contains("\"id\":\"3\""),
successful2Content);
+ assertTrue(successful2Content.contains("\"id\":\"4\""),
successful2Content);
+
+ // record 2 & 5 (in 2 batches) should fail because an int cannot be
indexed into a boolean field, the id field should remain in the error output
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 2);
+
+ final MockFlowFile error1 =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst();
+ error1.assertAttributeEquals("record.count", "1");
+ final String error1Content = error1.getContent();
+ assertTrue(error1Content.contains("\"id\":\"2\""), error1Content);
+ assertTrue(error1Content.contains("\"foo\":123"), error1Content);
+
+ final MockFlowFile error2 =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).get(1);
+ error2.assertAttributeEquals("record.count", "1");
+ final String error2Content = error2.getContent();
+ assertTrue(error2Content.contains("\"id\":\"5\""), error2Content);
+ assertTrue(error2Content.contains("\"foo\":456"), error2Content);
+ }
+
+ @Test
+ void testUpdateError() {
+ final String json = """
+ {"id": "123", "foo": "bar"}
+ """;
+
+ runner.setProperty(AbstractPutElasticsearch.INDEX_OP, "update");
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+
+ runner.enqueue(json);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+ final String errorContent =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent();
+ assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
+ assertTrue(errorContent.contains("\"foo\":\"bar\""), errorContent);
+ }
+
+ @Test
+ void testUpdateScriptError() {
+ final String json = """
+ {"id": "123", "script": {"source": "ctx._source.counter +=
params.count"}}
+ """;
+
+ runner.setProperty(AbstractPutElasticsearch.INDEX_OP, "update");
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchRecord.SCRIPT_RECORD_PATH,
"/script");
+
+ runner.enqueue(json);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+ final String errorContent =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent();
+ assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
+ assertTrue(errorContent.contains("\"script\":{"), errorContent);
+ }
+
+ @Test
+ void testScriptedUpsertError() {
+ final String json = """
+ {"id": "123", "script": {"source": "ctx._source.counter +=
params.count"}, "upsert": true}
+ """;
+
+ runner.setProperty(AbstractPutElasticsearch.INDEX_OP, "upsert");
+ runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
+ runner.setProperty(PutElasticsearchRecord.SCRIPT_RECORD_PATH,
"/script");
+ runner.setProperty(PutElasticsearchRecord.SCRIPTED_UPSERT_RECORD_PATH,
"/upsert");
+
+ runner.enqueue(json);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+ final String errorContent =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst().getContent();
+ assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
+ assertTrue(errorContent.contains("\"script\":{"), errorContent);
+ }
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java
index 9f3d6fd566..1d641912de 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java
@@ -121,12 +121,12 @@ public class AbstractMockElasticsearchClient extends
AbstractControllerService i
return String.format("http://localhost:9200/%s/%s", index,
StringUtils.isNotBlank(type) ? type : "");
}
- public boolean isThrowRetriableError() {
- return throwRetriableError;
+ public boolean isThrowRetryableError() {
+ return throwRetryableError;
}
- public void setThrowRetriableError(final boolean throwRetriableError) {
- this.throwRetriableError = throwRetriableError;
+ public void setThrowRetryableError(final boolean throwRetryableError) {
+ this.throwRetryableError = throwRetryableError;
}
public boolean isThrowFatalError() {
@@ -137,6 +137,6 @@ public class AbstractMockElasticsearchClient extends
AbstractControllerService i
this.throwFatalError = throwFatalError;
}
- private boolean throwRetriableError;
+ private boolean throwRetryableError;
private boolean throwFatalError;
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java
index 55d89e4600..a2a4af6aa1 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java
@@ -30,7 +30,7 @@ public class MockBulkLoadClientService extends
AbstractMockElasticsearchClient {
@Override
public IndexOperationResponse bulk(final List<IndexOperationRequest>
items, final Map<String, String> requestParameters) {
- if (isThrowRetriableError()) {
+ if (isThrowRetryableError()) {
throw new MockElasticsearchException(true, false);
} else if (isThrowFatalError()) {
throw new MockElasticsearchException(false, false);
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 163b8fad99..c7e811496d 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -52,7 +52,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
- .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.17.0"));
+ .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.17.2"));
protected static final String ELASTIC_USER_PASSWORD =
System.getProperty("elasticsearch.elastic_user.password",
RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
new ElasticsearchContainer(IMAGE)
@@ -79,7 +79,7 @@ public abstract class AbstractElasticsearchITBase {
protected static final boolean ENABLE_TEST_CONTAINERS =
"true".equalsIgnoreCase(System.getProperty("elasticsearch.testcontainers.enabled"));
protected static String elasticsearchHost;
- protected static void startTestcontainer() {
+ protected static void startTestContainer() {
if (ENABLE_TEST_CONTAINERS) {
if (getElasticMajorVersion() == 6) {
// disable system call filter check to allow Elasticsearch 6
to run on aarch64 machines (e.g. Mac M1/2)
@@ -99,7 +99,7 @@ public abstract class AbstractElasticsearchITBase {
static RestClient testDataManagementClient;
- protected static void stopTestcontainer() {
+ protected static void stopTestContainer() {
if (ENABLE_TEST_CONTAINERS) {
ELASTICSEARCH_CONTAINER.stop();
}
@@ -107,7 +107,7 @@ public abstract class AbstractElasticsearchITBase {
@BeforeAll
static void beforeAll() throws IOException {
- startTestcontainer();
+ startTestContainer();
type = getElasticMajorVersion() == 6 ? "_doc" : "";
System.out.printf("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nIMAGE:
%s:%s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
type, IMAGE.getRepository(), IMAGE.getVersionPart());
@@ -201,23 +201,13 @@ public abstract class AbstractElasticsearchITBase {
return actions;
}
- private static final class SetupAction {
- private final String verb;
- private final String path;
- private final String json;
-
- public SetupAction(final String verb, final String path, final String
json) {
- this.verb = verb;
- this.path = path;
- this.json = json;
- }
-
+ private record SetupAction(String verb, String path, String json) {
@Override
- public String toString() {
- return "SetupAction{" +
- "verb='" + verb + '\'' +
- ", path='" + path + '\'' +
- '}';
+ public String toString() {
+ return "SetupAction{" +
+ "verb='" + verb + '\'' +
+ ", path='" + path + '\'' +
+ '}';
+ }
}
- }
}
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
index dd7f1c4153..1e78810b06 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -33,7 +33,7 @@ language governing permissions and limitations under the
License. -->
</modules>
<properties>
- <elasticsearch.client.version>8.17.1</elasticsearch.client.version>
+ <elasticsearch.client.version>8.17.2</elasticsearch.client.version>
</properties>
<dependencyManagement>
@@ -114,7 +114,7 @@ language governing permissions and limitations under the
License. -->
<profile>
<id>elasticsearch7</id>
<properties>
-
<elasticsearch_docker_image>7.17.26</elasticsearch_docker_image>
+
<elasticsearch_docker_image>7.17.27</elasticsearch_docker_image>
</properties>
</profile>
</profiles>
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index eae7706786..f73971f4a8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -87,9 +87,10 @@ public class MockProcessSession implements ProcessSession {
private final Map<FlowFile, OutputStream> openOutputStreams = new
HashMap<>();
private final StateManager stateManager;
private final boolean allowSynchronousCommits;
+ private final boolean allowRecursiveReads;
private boolean committed = false;
- private boolean rolledback = false;
+ private boolean rolledBack = false;
private final Set<Long> removedFlowFiles = new HashSet<>();
private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
@@ -108,6 +109,11 @@ public class MockProcessSession implements ProcessSession {
public MockProcessSession(final SharedSessionState sharedState, final
Processor processor, final boolean enforceStreamsClosed, final StateManager
stateManager,
final boolean allowSynchronousCommits) {
+ this(sharedState, processor, enforceStreamsClosed, stateManager,
allowSynchronousCommits, false);
+ }
+
+ public MockProcessSession(final SharedSessionState sharedState, final
Processor processor, final boolean enforceStreamsClosed, final StateManager
stateManager,
+ final boolean allowSynchronousCommits, final
boolean allowRecursiveReads) {
this.processor = processor;
this.enforceStreamsClosed = enforceStreamsClosed;
this.sharedState = sharedState;
@@ -115,6 +121,7 @@ public class MockProcessSession implements ProcessSession {
this.provenanceReporter = new MockProvenanceReporter(this,
sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
this.stateManager = stateManager;
this.allowSynchronousCommits = allowSynchronousCommits;
+ this.allowRecursiveReads = allowRecursiveReads;
}
@Override
@@ -337,7 +344,7 @@ public class MockProcessSession implements ProcessSession {
* session
*/
public void clearRollback() {
- rolledback = false;
+ rolledBack = false;
}
@Override
@@ -822,7 +829,7 @@ public class MockProcessSession implements ProcessSession {
}
}
- rolledback = true;
+ rolledBack = true;
beingProcessed.clear();
currentVersions.clear();
originalVersions.clear();
@@ -1092,7 +1099,7 @@ public class MockProcessSession implements ProcessSession
{
private List<FlowFile> validateState(final Collection<FlowFile> flowFiles)
{
return flowFiles.stream()
- .map(ff -> validateState(ff))
+ .map(this::validateState)
.collect(Collectors.toList());
}
@@ -1104,7 +1111,7 @@ public class MockProcessSession implements ProcessSession
{
throw new FlowFileHandlingException(flowFile + " is not known in
this session");
}
- if (readRecursionSet.containsKey(flowFile)) {
+ if (readRecursionSet.containsKey(flowFile) && !allowRecursiveReads) {
throw new IllegalStateException(flowFile + " already in use for an
active callback or InputStream created by ProcessSession.read(FlowFile) has not
been closed");
}
@@ -1234,14 +1241,14 @@ public class MockProcessSession implements
ProcessSession {
* Assert that {@link #rollback()} has been called
*/
public void assertRolledBack() {
- Assertions.assertTrue(rolledback, "Session was not rolled back");
+ Assertions.assertTrue(rolledBack, "Session was not rolled back");
}
/**
* Assert that {@link #rollback()} has not been called
*/
public void assertNotRolledBack() {
- Assertions.assertFalse(rolledback, "Session was rolled back");
+ Assertions.assertFalse(rolledBack, "Session was rolled back");
}
/**
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
index a5d9ff9f40..4fc53ec711 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -33,19 +33,21 @@ public class MockSessionFactory implements
ProcessSessionFactory {
private final boolean enforceReadStreamsClosed;
private final StateManager stateManager;
private final boolean allowSynchronousSessionCommits;
+ private final boolean allowRecursiveReads;
MockSessionFactory(final SharedSessionState sharedState, final Processor
processor, final boolean enforceReadStreamsClosed, final StateManager
stateManager,
- final boolean allowSynchronousSessionCommits) {
+ final boolean allowSynchronousSessionCommits, final
boolean allowRecursiveReads) {
this.sharedState = sharedState;
this.processor = processor;
this.enforceReadStreamsClosed = enforceReadStreamsClosed;
this.stateManager = stateManager;
this.allowSynchronousSessionCommits = allowSynchronousSessionCommits;
+ this.allowRecursiveReads = allowRecursiveReads;
}
@Override
public ProcessSession createSession() {
- final MockProcessSession session = new MockProcessSession(sharedState,
processor, enforceReadStreamsClosed, stateManager,
allowSynchronousSessionCommits);
+ final MockProcessSession session = new MockProcessSession(sharedState,
processor, enforceReadStreamsClosed, stateManager,
allowSynchronousSessionCommits, allowRecursiveReads);
createdSessions.add(session);
return session;
}
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 292c7a9e37..68d1e90e20 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -95,6 +95,7 @@ public class StandardProcessorTestRunner implements
TestRunner {
private int numThreads = 1;
private MockSessionFactory sessionFactory;
private boolean allowSynchronousSessionCommits = false;
+ private boolean allowRecursiveReads = false;
private long runSchedule = 0;
private final AtomicInteger invocations = new AtomicInteger(0);
@@ -128,7 +129,7 @@ public class StandardProcessorTestRunner implements
TestRunner {
this.sharedState = new SharedSessionState(processor, idGenerator);
this.flowFileQueue = sharedState.getFlowFileQueue();
this.processorStateManager = new MockStateManager(processor);
- this.sessionFactory = new MockSessionFactory(sharedState, processor,
enforceReadStreamsClosed, processorStateManager,
allowSynchronousSessionCommits);
+ this.sessionFactory = new MockSessionFactory(sharedState, processor,
enforceReadStreamsClosed, processorStateManager,
allowSynchronousSessionCommits, allowRecursiveReads);
this.context = new MockProcessContext(processor, processorName,
processorStateManager, environmentVariables);
this.kerberosContext = kerberosContext;
@@ -149,7 +150,7 @@ public class StandardProcessorTestRunner implements
TestRunner {
@Override
public void enforceReadStreamsClosed(final boolean enforce) {
enforceReadStreamsClosed = enforce;
- this.sessionFactory = new MockSessionFactory(sharedState, processor,
enforceReadStreamsClosed, processorStateManager,
allowSynchronousSessionCommits);
+ this.sessionFactory = new MockSessionFactory(sharedState, processor,
enforceReadStreamsClosed, processorStateManager,
allowSynchronousSessionCommits, allowRecursiveReads);
}
@Override
@@ -161,7 +162,13 @@ public class StandardProcessorTestRunner implements
TestRunner {
@Override
public void setAllowSynchronousSessionCommits(final boolean
allowSynchronousSessionCommits) {
this.allowSynchronousSessionCommits = allowSynchronousSessionCommits;
- this.sessionFactory = new MockSessionFactory(sharedState, processor,
enforceReadStreamsClosed, processorStateManager,
allowSynchronousSessionCommits);
+ this.sessionFactory = new MockSessionFactory(sharedState, processor,
enforceReadStreamsClosed, processorStateManager,
allowSynchronousSessionCommits, allowRecursiveReads);
+ }
+
+ @Override
+ public void setAllowRecursiveReads(final boolean allowRecursiveReads) {
+ this.allowRecursiveReads = allowRecursiveReads;
+ this.sessionFactory = new MockSessionFactory(sharedState, processor,
enforceReadStreamsClosed, processorStateManager,
allowSynchronousSessionCommits, allowRecursiveReads);
}
@Override
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 57f39f23e7..0f5c1ecf73 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -891,13 +891,21 @@ public interface TestRunner {
void setValidateExpressionUsage(boolean validate);
/**
- * Specifies whether or not the TestRunner will allow
ProcessSession.commit() to be called.
+ * Specifies whether the TestRunner will allow ProcessSession.commit() to
be called.
* By default, the value is <code>false</code>, meaning that any call to
ProcessSession.commit() will throw
* an Exception. See JavaDocs for {@link ProcessSession#commit()} for more
information
- * @param allow whethr or not to allow asynchronous session commits (i.e.,
calls to ProcessSession.commit())
+ * @param allow whether to allow asynchronous session commits (i.e., calls
to ProcessSession.commit())
*/
void setAllowSynchronousSessionCommits(boolean allow);
+ /**
+ * Specifies whether the TestRunner will allow ProcessSession.read()
multiple times for the same FlowFile while an InputStream is already open.
+ * By default, the value is <code>false</code>, meaning that any call to
ProcessSession.read() for a FlowFile already being read will throw
+ * an Exception. See JavaDocs for {@link ProcessSession#read(FlowFile)}
for more information
+ * @param allow whether to allow recursive reads of a FlowFile (i.e.,
calls to ProcessSession.read())
+ */
+ void setAllowRecursiveReads(boolean allow);
+
/**
* Removes the {@link PropertyDescriptor} from the {@link ProcessContext},
* effectively setting its value to null, or the property's default value,
if it has one.