[
https://issues.apache.org/jira/browse/NIFI-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stanislas Deneuville updated NIFI-8620:
---------------------------------------
Description:
{color:#000000}For my particular use case I read and process a big file and
produce smaller results along the way. I wanted to be able to regularly commit
what has already been done before the end of the whole process.{color}
{color:#000000}So I inspired myself from the BinManager and created a custom
processor that use 2 ProcessSessions:{color}
* {color:#000000}a first main session for the read input flowfile and creating
new ones forked from it{color}
* {color:#000000}a second session for committing on the fly{color}
{color:#000000}The workflow is something like that:{color}
{code:java}
final ProcessSession mainSession = sessionFactory.createSession(); final
ProcessSession secondSession = sessionFactory.createSession();
FlowFile inputFlowFile = mainSession.get();
try (InputStream in = mainSession.read(inputFlowFile)) {
while (stillSomethingToRead) {
// read and process data
inputData = in.read(...);
transformedData = transform(inputData);
// Create output flowfile
FlowFile outputflowFile = mainSession.create(inputFlowfile);
// write transformedData to outputflowFile content
[...]
// also put some attributes on outputflowFile
[...]
// Output the results without waiting
mainSession.migrate(secondSession, outputflowFiles);
secondSession.transfer(outputflowFiles, successRelationship);
secondSession.commit();
}
}
mainsession.commit()
{code}
It works well on Nifi Mock, however in a real Nifi environment I get a null
pointer exception during the commit.
{noformat}
[id=9f6342ac-ae78-30f7-22cf-6d7517618f19] Unknown error occurred:
java.lang.NullPointerException java.lang.NullPointerException: null at
org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786)
at
org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600)
at
org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353)
at
org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332)
{noformat}
{color:#000000}Note: I don't do anything related to Data Provenance in my
code?{color}
was:
{color:#000000}For my particular use case I read and process a big file and
produce smaller results along the way. I wanted to be able to regularly commit
what has already been done before the end of the whole process.{color}
{color:#000000}So I inspired myself from the BinManager and created a custom
processor that use 2 ProcessSessions:{color} * {color:#000000}a first main
session for the read input flowfile and creating new ones forked from it{color}
* {color:#000000}a second session for committing on the fly{color}
{color:#000000}{color:#000000}The workflow is something like that:{color}{color}
{code:java}
final ProcessSession mainSession = sessionFactory.createSession(); final
ProcessSession secondSession = sessionFactory.createSession();
FlowFile inputFlowFile = mainSession.get();
try (InputStream in = mainSession.read(inputFlowFile)) {
while (stillSomethingToRead) {
// read and process data
inputData = in.read(...);
transformedData = transform(inputData);
// Create output flowfile
FlowFile outputflowFile = mainSession.create(inputFlowfile);
// write transformedData to outputflowFile content
[...]
// also put some attributes on outputflowFile
[...]
// Output the results without waiting
mainSession.migrate(secondSession, outputflowFiles);
secondSession.transfer(outputflowFiles, successRelationship);
secondSession.commit();
}
}
mainsession.commit()
{code}
{color:#000000}It works well on Nifi Mock, however in a real Nifi environment I
get a null pointer exception during the commit.{color}
{noformat}
[id=9f6342ac-ae78-30f7-22cf-6d7517618f19] Unknown error occurred:
java.lang.NullPointerException java.lang.NullPointerException: null at
org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786)
at
org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600)
at
org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353)
at
org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332)
{noformat}
{color:#000000}Note: I don't do anything related to Data Provenance in my
code?{color}
> NullPointerException on commit with multiple ProcessSession
> -----------------------------------------------------------
>
> Key: NIFI-8620
> URL: https://issues.apache.org/jira/browse/NIFI-8620
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 1.13.0
> Environment: Kubernetes on linux
> Reporter: Stanislas Deneuville
> Priority: Major
>
> {color:#000000}For my particular use case I read and process a big file and
> produce smaller results along the way. I wanted to be able to regularly
> commit what has already been done before the end of the whole process.{color}
> {color:#000000}So I inspired myself from the BinManager and created a custom
> processor that use 2 ProcessSessions:{color}
> * {color:#000000}a first main session for the read input flowfile and
> creating new ones forked from it{color}
> * {color:#000000}a second session for committing on the fly{color}
>
> {color:#000000}The workflow is something like that:{color}
> {code:java}
> final ProcessSession mainSession = sessionFactory.createSession(); final
> ProcessSession secondSession = sessionFactory.createSession();
> FlowFile inputFlowFile = mainSession.get();
> try (InputStream in = mainSession.read(inputFlowFile)) {
> while (stillSomethingToRead) {
> // read and process data
> inputData = in.read(...);
> transformedData = transform(inputData);
> // Create output flowfile
> FlowFile outputflowFile = mainSession.create(inputFlowfile);
> // write transformedData to outputflowFile content
> [...]
> // also put some attributes on outputflowFile
> [...]
> // Output the results without waiting
> mainSession.migrate(secondSession, outputflowFiles);
> secondSession.transfer(outputflowFiles, successRelationship);
> secondSession.commit();
> }
> }
> mainsession.commit()
> {code}
>
>
> It works well on Nifi Mock, however in a real Nifi environment I get a null
> pointer exception during the commit.
>
> {noformat}
> [id=9f6342ac-ae78-30f7-22cf-6d7517618f19] Unknown error occurred:
> java.lang.NullPointerException java.lang.NullPointerException: null
> at
> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332)
> {noformat}
>
> {color:#000000}Note: I don't do anything related to Data Provenance in my
> code?{color}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)