session.transfer(output, REL_ORIGINAL);

should be

   session.transfer(inputFlowFile, REL_ORIGINAL);

On Wed, Jul 24, 2019 at 3:35 PM Mike Thomsen <[email protected]> wrote:

> Also, here is a pattern we tend to use for reading using try-with
>
>
> FlowFile output = session.create(inputFlowFile); //create linkage in
> provenance
> try (InputStream is = session.read(inputFlowFile); OutputStream os =
> session.write(output);) {
>     //Do stuff with streams.
>
>     is.close();
>    os.close(); //These need to be closed before transfer
>    session.transfer(output, REL_SUCCESS);
>    session.transfer(output, REL_ORIGINAL);
> } catch (Exception ex) {
> getLogger().error("", ex);
>     session.remove(output);
>     session.transfer(inputFlowFile, REL_ERROR);
> }
>
> On Wed, Jul 24, 2019 at 2:28 PM Mike Thomsen <[email protected]>
> wrote:
>
>> Two problems I see right here are this:
>>
>> flowfile = session.create();
>>     flowfile = session.get();
>>
>>
>> The second call will replace the newly created flowfile that comes from
>> session.create() with either null or the next element in the queue. That
>> will result in you having a flowfile created in the session that you cannot
>> either transfer to a downstream relationship with session.transfer() or
>> remove with session.remove(). End result there will be the session will
>> fail to commit because it doesn't know what it should do with that empty
>> flowfile.
>>
>> Second:
>>
>> new FileReader("JSON_PATH")
>>
>>
>> That will result in java.io.FileReader looking for a file named
>> "JSON_PATH" in the current working folder. Unless that is your intent, you
>> need to remove the quotes and make sure it's pointing to a real path on the
>> file system.
>>
>> On Tue, Jul 23, 2019 at 10:13 PM Farooq Mustafa <[email protected]>
>> wrote:
>>
>>> Hello,
>>>
>>>
>>> I am writing my first custom processor. I followed the example here
>>> https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/
>>>
>>>
>>> In the settings, I have /tmp/jsoniput/sample1.json file
>>>
>>>
>>> The source code;
>>>
>>> public void onTrigger(final ProcessContext context, final ProcessSession 
>>> session) throws ProcessException {
>>>
>>>   final AtomicReference<String> value = new AtomicReference<>();
>>>   getLogger().warn("sample - Farooq - 1 , value = <" + value.toString() + 
>>> ">");
>>>
>>>   FlowFile flowfile = session.get();
>>>   getLogger().warn("sample - Farooq - 2");
>>>
>>>   if ( null == flowfile) {
>>>     getLogger().warn("sample - Farooq - 2.1 - flowfile is NULL *** ");
>>>     flowfile = session.create();
>>>     flowfile = session.get();
>>>     getLogger().warn("sample - Farooq - 2.2 - session.create() / 
>>> session.get() --- NOT SURE ");
>>>   }
>>>   getLogger().warn("sample - Farooq - 2.3 JSON_PATH :" + 
>>> context.getProperty(JSON_PATH).getValue());
>>>
>>>   session.read(flowfile, new InputStreamCallback() {
>>>     @Override
>>>     public void process(InputStream in) throws IOException {
>>>
>>>       try {
>>>         getLogger().warn("sample - Farooq - 3 JSON_PATH :" + 
>>> context.getProperty(JSON_PATH).getValue());
>>>         Object obj = new JSONParser().parse(new FileReader("JSON_PATH"));
>>>         getLogger().warn("sample - Farooq - 3.1 ");
>>>
>>>
>>>
>>> The log output:
>>>
>>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>>> o.a.nifi.processors.sitc.JsonProcessor
>>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 1
>>> , value = <null>
>>>
>>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>>> o.a.nifi.processors.sitc.JsonProcessor
>>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2
>>>
>>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>>> o.a.nifi.processors.sitc.JsonProcessor
>>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
>>> 2.1 - flowfile is NULL ***
>>>
>>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>>> o.a.nifi.processors.sitc.JsonProcessor
>>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
>>> 2.2 - session.create() / session.get() --- NOT SURE
>>>
>>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>>> o.a.nifi.processors.sitc.JsonProcessor
>>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
>>> 2.3 JSON_PATH :/tmp/jsoninput
>>>
>>> 2019-07-20 13:58:42,626 ERROR [Timer-Driven Process Thread-4]
>>> o.a.nifi.processors.sitc.JsonProcessor
>>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267]
>>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] failed to process
>>> session due to java.lang.NullPointerException; Processor Administratively
>>> Yielded for 1 sec: java.lang.NullPointerException
>>>
>>> java.lang.NullPointerException: null
>>>
>>> at
>>> org.apache.nifi.controller.repository.StandardProcessSession.getRecord(StandardProcessSession.java:574)
>>>
>>> at
>>> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3132)
>>>
>>> at
>>> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2187)
>>>
>>>
>>> And the basic flow:
>>> [image: image.png]
>>>
>>>
>>>
>>> Any ideas or guidance will be appreciated.
>>>
>>>
>>> Thanks
>>>
>>> Farooq
>>>
>>

Reply via email to