[ 
https://issues.apache.org/jira/browse/NIFI-15001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Turley updated NIFI-15001:
-------------------------------
    Description: 
When PutElasticsearchRecord's batch size is larger than the records in a flow 
file everything is fine. however, if i have a 2000 record flow file and my 
batch size is set to 100 it will fail with the following error:
{code:java}
ERROR PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] 
Processing halted: yielding [1 sec]: 
org.apache.nifi.processor.exception.FlowFileHandlingException: 
StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1758032959020-2346, container=default, 
section=298], offset=0, 
length=316899],offset=0,name=1375079273674113,size=316899] is not known in this 
session (StandardProcessSession[id=40715]) 14:36:13 UTC ERROR 
PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] Could not index 
documents.: org.apache.nifi.processor.exception.FlowFileAccessException: Could 
not read from 
StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1758032959020-2346, container=default, 
section=298], offset=0, 
length=316899],offset=0,name=1375079273674113,size=316899] - Caused by: 
java.io.IOException: Stream closed

{code}
 

It appears in 2.3.0 the processor was altered. I was able to make the code work 
again by altering this section of PutElasticsearchRecord.java:

 
{code:java}
    private ResponseDetails indexDocuments(final BulkOperation bundle, final 
ProcessSession session, final FlowFile input,
                                           final IndexOperationParameters 
indexOperationParameters, final int batch)
            throws IOException, SchemaNotFoundException {
        final IndexOperationResponse response = 
clientService.get().bulk(bundle.getOperationList(), 
indexOperationParameters.getElasticsearchRequestOptions());
        final Map<Integer, Map<String, Object>> errors = 
findElasticsearchResponseErrors(response);
        if (!errors.isEmpty()) {
            handleElasticsearchDocumentErrors(errors, session, input);
        }
        final int numErrors = errors.size();
        final int numSuccessful = response.getItems() == null ? 0 : 
response.getItems().size() - numErrors;
        final Map<String, Output> outputs = new HashMap<>();
        try {
            for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
                final String type;
                final Relationship relationship;
                final Map<String, Object> error;
                final Record outputRecord = bundle.getOriginalRecords().get(o);
                final RecordSchema recordSchema = outputRecord.getSchema();
                if (numErrors > 0 && errors.containsKey(o)) {
                    relationship = REL_ERRORS;
                    error = errors.get(o);
                    if (groupBulkErrors) {
                        if (isElasticsearchNotFound().test(error)) {
                            type = OUTPUT_TYPE_NOT_FOUND;
                        } else {
                            type = getErrorType(error);
                        }
                    } else {
                        type = OUTPUT_TYPE_ERROR;
                    }
                } else {
                    relationship = REL_SUCCESSFUL;
                    error = null;
                    type = OUTPUT_TYPE_SUCCESS;
                }
                final Output output = getOutputByType(outputs, type, session, 
relationship, input, recordSchema);
                output.write(outputRecord, error);
            }
            for (final Output output : outputs.values()) {
                output.transfer(session);
            }
        } catch (final IOException | SchemaNotFoundException ex) {
            getLogger().error("Unable to write error/successful records", ex);
            outputs.values().forEach(o -> {
                try {
                    o.remove(session);
                } catch (IOException ioe) {
                    getLogger().warn("Error closing RecordSetWriter for 
FlowFile", ioe);
                }
            });
            throw ex;
        }
        return new ResponseDetails(outputs, numSuccessful, numErrors);
    }
 
{code}
after this change the test will fail, but when used in nifi it's able to batch 
properly.

  was:
When PutElasticsearchRecord's batch size is larger than the records in a flow 
file everything is fine. however, if i have a 2000 record flow file and my 
batch size is set to 100 it will fail with the following error:
{code:java}
ERROR PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] 
Processing halted: yielding [1 sec]: 
org.apache.nifi.processor.exception.FlowFileHandlingException: 
StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1758032959020-2346, container=default, 
section=298], offset=0, 
length=316899],offset=0,name=1375079273674113,size=316899] is not known in this 
session (StandardProcessSession[id=40715]) 14:36:13 UTC ERROR 
PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] Could not index 
documents.: org.apache.nifi.processor.exception.FlowFileAccessException: Could 
not read from 
StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1758032959020-2346, container=default, 
section=298], offset=0, 
length=316899],offset=0,name=1375079273674113,size=316899] - Caused by: 
java.io.IOException: Stream closed

{code}
 

It appears in 2.3.0 the processor was altered. I was able to make the code work 
again by altering this section of PutElasticsearchRecord.java:
{code:java}
       private ResponseDetails indexDocuments(final BulkOperation bundle, final 
ProcessSession session, final FlowFile input,                                   
        final IndexOperationParameters indexOperationParameters, final int 
batch)            throws IOException, SchemaNotFoundException {        final 
IndexOperationResponse response = 
clientService.get().bulk(bundle.getOperationList(), 
indexOperationParameters.getElasticsearchRequestOptions());
        final Map<Integer, Map<String, Object>> errors = 
findElasticsearchResponseErrors(response);        if (!errors.isEmpty()) {      
      handleElasticsearchDocumentErrors(errors, session, input);        }
        final int numErrors = errors.size();        final int numSuccessful = 
response.getItems() == null ? 0 : response.getItems().size() - numErrors;       
 final Map<String, Output> outputs = new HashMap<>();
        try {            for (int o = 0; o < 
bundle.getOriginalRecords().size(); o++) {                final String type;    
            final Relationship relationship;                final Map<String, 
Object> error;                final Record outputRecord = 
bundle.getOriginalRecords().get(o);                final RecordSchema 
recordSchema = outputRecord.getSchema();                if (numErrors > 0 && 
errors.containsKey(o)) {                    relationship = REL_ERRORS;          
          error = errors.get(o);                    if (groupBulkErrors) {      
                  if (isElasticsearchNotFound().test(error)) {                  
          type = OUTPUT_TYPE_NOT_FOUND;                        } else {         
                   type = getErrorType(error);                        }         
           } else {                        type = OUTPUT_TYPE_ERROR;            
        }                } else {                    relationship = 
REL_SUCCESSFUL;                    error = null;                    type = 
OUTPUT_TYPE_SUCCESS;                }                final Output output = 
getOutputByType(outputs, type, session, relationship, input, recordSchema);     
           output.write(outputRecord, error);            }
            for (final Output output : outputs.values()) {                
output.transfer(session);            }        } catch (final IOException | 
SchemaNotFoundException ex) {            getLogger().error("Unable to write 
error/successful records", ex);            outputs.values().forEach(o -> {      
          try {                    o.remove(session);                } catch 
(IOException ioe) {                    getLogger().warn("Error closing 
RecordSetWriter for FlowFile", ioe);                }            });            
throw ex;        }
        return new ResponseDetails(outputs, numSuccessful, numErrors);    
}{code}
after this change the test will fail, but when used in nifi it's able to batch 
properly.


> PutElasticsearchRecord bulk requests failing
> --------------------------------------------
>
>                 Key: NIFI-15001
>                 URL: https://issues.apache.org/jira/browse/NIFI-15001
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>            Reporter: Adam Turley
>            Priority: Major
>
> When PutElasticsearchRecord's batch size is larger than the records in a flow 
> file everything is fine. however, if i have a 2000 record flow file and my 
> batch size is set to 100 it will fail with the following error:
> {code:java}
> ERROR PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] 
> Processing halted: yielding [1 sec]: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1758032959020-2346, 
> container=default, section=298], offset=0, 
> length=316899],offset=0,name=1375079273674113,size=316899] is not known in 
> this session (StandardProcessSession[id=40715]) 14:36:13 UTC ERROR 
> PutElasticsearchRecord[id=52bf2533-0199-1000-5339-6fb2ce41a3f9] Could not 
> index documents.: 
> org.apache.nifi.processor.exception.FlowFileAccessException: Could not read 
> from 
> StandardFlowFileRecord[uuid=bb88525f-0ee2-47cf-83ca-f2a2072f5790,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1758032959020-2346, 
> container=default, section=298], offset=0, 
> length=316899],offset=0,name=1375079273674113,size=316899] - Caused by: 
> java.io.IOException: Stream closed
> {code}
>  
> It appears in 2.3.0 the processor was altered. I was able to make the code 
> work again by altering this section of PutElasticsearchRecord.java:
>  
> {code:java}
>     private ResponseDetails indexDocuments(final BulkOperation bundle, final 
> ProcessSession session, final FlowFile input,
>                                            final IndexOperationParameters 
> indexOperationParameters, final int batch)
>             throws IOException, SchemaNotFoundException {
>         final IndexOperationResponse response = 
> clientService.get().bulk(bundle.getOperationList(), 
> indexOperationParameters.getElasticsearchRequestOptions());
>         final Map<Integer, Map<String, Object>> errors = 
> findElasticsearchResponseErrors(response);
>         if (!errors.isEmpty()) {
>             handleElasticsearchDocumentErrors(errors, session, input);
>         }
>         final int numErrors = errors.size();
>         final int numSuccessful = response.getItems() == null ? 0 : 
> response.getItems().size() - numErrors;
>         final Map<String, Output> outputs = new HashMap<>();
>         try {
>             for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
>                 final String type;
>                 final Relationship relationship;
>                 final Map<String, Object> error;
>                 final Record outputRecord = 
> bundle.getOriginalRecords().get(o);
>                 final RecordSchema recordSchema = outputRecord.getSchema();
>                 if (numErrors > 0 && errors.containsKey(o)) {
>                     relationship = REL_ERRORS;
>                     error = errors.get(o);
>                     if (groupBulkErrors) {
>                         if (isElasticsearchNotFound().test(error)) {
>                             type = OUTPUT_TYPE_NOT_FOUND;
>                         } else {
>                             type = getErrorType(error);
>                         }
>                     } else {
>                         type = OUTPUT_TYPE_ERROR;
>                     }
>                 } else {
>                     relationship = REL_SUCCESSFUL;
>                     error = null;
>                     type = OUTPUT_TYPE_SUCCESS;
>                 }
>                 final Output output = getOutputByType(outputs, type, session, 
> relationship, input, recordSchema);
>                 output.write(outputRecord, error);
>             }
>             for (final Output output : outputs.values()) {
>                 output.transfer(session);
>             }
>         } catch (final IOException | SchemaNotFoundException ex) {
>             getLogger().error("Unable to write error/successful records", ex);
>             outputs.values().forEach(o -> {
>                 try {
>                     o.remove(session);
>                 } catch (IOException ioe) {
>                     getLogger().warn("Error closing RecordSetWriter for 
> FlowFile", ioe);
>                 }
>             });
>             throw ex;
>         }
>         return new ResponseDetails(outputs, numSuccessful, numErrors);
>     }
>  
> {code}
> after this change the test will fail, but when used in nifi it's able to 
> batch properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to