[
https://issues.apache.org/jira/browse/NIFI-15001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Adam Turley updated NIFI-15001:
-------------------------------
Description:
When PutElasticsearchRecord's batch size is larger than the records in a flow
file everything is fine. however, if i have a 2000 record flow file and my
batch size is set to 100 it will fail with the following error:
{code:java}
ERROR PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9]
Processing halted: yielding [1 sec]:
org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1758032959020-2346, container=default,
section=298], offset=0,
length=316899],offset=0,name=1375079273674113,size=316899] is not known in this
session (StandardProcessSession[id=40715]) 14:36:13 UTC ERROR
PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] Could not index
documents.: org.apache.nifi.processor.exception.FlowFileAccessException: Could
not read from
StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1758032959020-2346, container=default,
section=298], offset=0,
length=316899],offset=0,name=1375079273674113,size=316899] - Caused by:
java.io.IOException: Stream closed
{code}
It appears in 2.3.0 the processor was altered. I was able to make the code work
again by altering this section of PutElasticsearchRecord.java:
{code:java}
private ResponseDetails indexDocuments(final BulkOperation bundle, final
ProcessSession session, final FlowFile input,
final IndexOperationParameters
indexOperationParameters, final int batch)
throws IOException, SchemaNotFoundException {
final IndexOperationResponse response =
clientService.get().bulk(bundle.getOperationList(),
indexOperationParameters.getElasticsearchRequestOptions());
final Map<Integer, Map<String, Object>> errors =
findElasticsearchResponseErrors(response);
if (!errors.isEmpty()) {
handleElasticsearchDocumentErrors(errors, session, input);
}
final int numErrors = errors.size();
final int numSuccessful = response.getItems() == null ? 0 :
response.getItems().size() - numErrors;
final Map<String, Output> outputs = new HashMap<>();
try {
for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
final String type;
final Relationship relationship;
final Map<String, Object> error;
final Record outputRecord = bundle.getOriginalRecords().get(o);
final RecordSchema recordSchema = outputRecord.getSchema();
if (numErrors > 0 && errors.containsKey(o)) {
relationship = REL_ERRORS;
error = errors.get(o);
if (groupBulkErrors) {
if (isElasticsearchNotFound().test(error)) {
type = OUTPUT_TYPE_NOT_FOUND;
} else {
type = getErrorType(error);
}
} else {
type = OUTPUT_TYPE_ERROR;
}
} else {
relationship = REL_SUCCESSFUL;
error = null;
type = OUTPUT_TYPE_SUCCESS;
}
final Output output = getOutputByType(outputs, type, session,
relationship, input, recordSchema);
output.write(outputRecord, error);
}
for (final Output output : outputs.values()) {
output.transfer(session);
}
} catch (final IOException | SchemaNotFoundException ex) {
getLogger().error("Unable to write error/successful records", ex);
outputs.values().forEach(o -> {
try {
o.remove(session);
} catch (IOException ioe) {
getLogger().warn("Error closing RecordSetWriter for
FlowFile", ioe);
}
});
throw ex;
}
return new ResponseDetails(outputs, numSuccessful, numErrors);
}
{code}
after this change the test will fail, but when used in nifi it's able to batch
properly.
was:
When PutElasticsearchRecord's batch size is larger than the records in a flow
file everything is fine. however, if i have a 2000 record flow file and my
batch size is set to 100 it will fail with the following error:
{code:java}
ERROR PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9]
Processing halted: yielding [1 sec]:
org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1758032959020-2346, container=default,
section=298], offset=0,
length=316899],offset=0,name=1375079273674113,size=316899] is not known in this
session (StandardProcessSession[id=40715]) 14:36:13 UTC ERROR
PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] Could not index
documents.: org.apache.nifi.processor.exception.FlowFileAccessException: Could
not read from
StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1758032959020-2346, container=default,
section=298], offset=0,
length=316899],offset=0,name=1375079273674113,size=316899] - Caused by:
java.io.IOException: Stream closed
{code}
It appears in 2.3.0 the processor was altered. I was able to make the code work
again by altering this section of PutElasticsearchRecord.java:
{code:java}
private ResponseDetails indexDocuments(final BulkOperation bundle, final
ProcessSession session, final FlowFile input,
final IndexOperationParameters indexOperationParameters, final int
batch) throws IOException, SchemaNotFoundException { final
IndexOperationResponse response =
clientService.get().bulk(bundle.getOperationList(),
indexOperationParameters.getElasticsearchRequestOptions());
final Map<Integer, Map<String, Object>> errors =
findElasticsearchResponseErrors(response); if (!errors.isEmpty()) {
handleElasticsearchDocumentErrors(errors, session, input); }
final int numErrors = errors.size(); final int numSuccessful =
response.getItems() == null ? 0 : response.getItems().size() - numErrors;
final Map<String, Output> outputs = new HashMap<>();
try { for (int o = 0; o <
bundle.getOriginalRecords().size(); o++) { final String type;
final Relationship relationship; final Map<String,
Object> error; final Record outputRecord =
bundle.getOriginalRecords().get(o); final RecordSchema
recordSchema = outputRecord.getSchema(); if (numErrors > 0 &&
errors.containsKey(o)) { relationship = REL_ERRORS;
error = errors.get(o); if (groupBulkErrors) {
if (isElasticsearchNotFound().test(error)) {
type = OUTPUT_TYPE_NOT_FOUND; } else {
type = getErrorType(error); }
} else { type = OUTPUT_TYPE_ERROR;
} } else { relationship =
REL_SUCCESSFUL; error = null; type =
OUTPUT_TYPE_SUCCESS; } final Output output =
getOutputByType(outputs, type, session, relationship, input, recordSchema);
output.write(outputRecord, error); }
for (final Output output : outputs.values()) {
output.transfer(session); } } catch (final IOException |
SchemaNotFoundException ex) { getLogger().error("Unable to write
error/successful records", ex); outputs.values().forEach(o -> {
try { o.remove(session); } catch
(IOException ioe) { getLogger().warn("Error closing
RecordSetWriter for FlowFile", ioe); } });
throw ex; }
return new ResponseDetails(outputs, numSuccessful, numErrors);
}{code}
after this change the test will fail, but when used in nifi it's able to batch
properly.
> PutElasticsearchRecord bulk requests failing
> --------------------------------------------
>
> Key: NIFI-15001
> URL: https://issues.apache.org/jira/browse/NIFI-15001
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
> Reporter: Adam Turley
> Priority: Major
>
> When PutElasticsearchRecord's batch size is larger than the records in a flow
> file everything is fine. however, if i have a 2000 record flow file and my
> batch size is set to 100 it will fail with the following error:
> {code:java}
> ERROR PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9]
> Processing halted: yielding [1 sec]:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1758032959020-2346,
> container=default, section=298], offset=0,
> length=316899],offset=0,name=1375079273674113,size=316899] is not known in
> this session (StandardProcessSession[id=40715]) 14:36:13 UTC ERROR
> PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] Could not
> index documents.:
> org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
> from
> StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1758032959020-2346,
> container=default, section=298], offset=0,
> length=316899],offset=0,name=1375079273674113,size=316899] - Caused by:
> java.io.IOException: Stream closed
> {code}
>
> It appears in 2.3.0 the processor was altered. I was able to make the code
> work again by altering this section of PutElasticsearchRecord.java:
>
> {code:java}
> private ResponseDetails indexDocuments(final BulkOperation bundle, final
> ProcessSession session, final FlowFile input,
> final IndexOperationParameters
> indexOperationParameters, final int batch)
> throws IOException, SchemaNotFoundException {
> final IndexOperationResponse response =
> clientService.get().bulk(bundle.getOperationList(),
> indexOperationParameters.getElasticsearchRequestOptions());
> final Map<Integer, Map<String, Object>> errors =
> findElasticsearchResponseErrors(response);
> if (!errors.isEmpty()) {
> handleElasticsearchDocumentErrors(errors, session, input);
> }
> final int numErrors = errors.size();
> final int numSuccessful = response.getItems() == null ? 0 :
> response.getItems().size() - numErrors;
> final Map<String, Output> outputs = new HashMap<>();
> try {
> for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
> final String type;
> final Relationship relationship;
> final Map<String, Object> error;
> final Record outputRecord =
> bundle.getOriginalRecords().get(o);
> final RecordSchema recordSchema = outputRecord.getSchema();
> if (numErrors > 0 && errors.containsKey(o)) {
> relationship = REL_ERRORS;
> error = errors.get(o);
> if (groupBulkErrors) {
> if (isElasticsearchNotFound().test(error)) {
> type = OUTPUT_TYPE_NOT_FOUND;
> } else {
> type = getErrorType(error);
> }
> } else {
> type = OUTPUT_TYPE_ERROR;
> }
> } else {
> relationship = REL_SUCCESSFUL;
> error = null;
> type = OUTPUT_TYPE_SUCCESS;
> }
> final Output output = getOutputByType(outputs, type, session,
> relationship, input, recordSchema);
> output.write(outputRecord, error);
> }
> for (final Output output : outputs.values()) {
> output.transfer(session);
> }
> } catch (final IOException | SchemaNotFoundException ex) {
> getLogger().error("Unable to write error/successful records", ex);
> outputs.values().forEach(o -> {
> try {
> o.remove(session);
> } catch (IOException ioe) {
> getLogger().warn("Error closing RecordSetWriter for
> FlowFile", ioe);
> }
> });
> throw ex;
> }
> return new ResponseDetails(outputs, numSuccessful, numErrors);
> }
>
> {code}
> after this change the test will fail, but when used in nifi it's able to
> batch properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)