session.transfer(output, REL_ORIGINAL); should be
session.transfer(inputFlowFile, REL_ORIGINAL); On Wed, Jul 24, 2019 at 3:35 PM Mike Thomsen <[email protected]> wrote: > Also, here is a pattern we tend to use for reading using try-with > > > FlowFile output = session.create(inputFlowFile); //create linkage in > provenance > try (InputStream is = session.read(inputFlowFile); OutputStream os = > session.write(output);) { > //Do stuff with streams. > > is.close(); > os.close(); //These need to be closed before transfer > session.transfer(output, REL_SUCCESS); > session.transfer(output, REL_ORIGINAL); > } catch (Exception ex) { > getLogger().error("", ex); > session.remove(output); > session.transfer(inputFlowFile, REL_ERROR); > } > > On Wed, Jul 24, 2019 at 2:28 PM Mike Thomsen <[email protected]> > wrote: > >> Two problems I see right here are this: >> >> flowfile = session.create(); >> flowfile = session.get(); >> >> >> The second call will replace the newly created flowfile that comes from >> session.create() with either null or the next element in the queue. That >> will result in you having a flowfile created in the session that you cannot >> either transfer to a downstream relationship with session.transfer() or >> remove with session.remove(). End result there will be the session will >> fail to commit because it doesn't know what it should do with that empty >> flowfile. >> >> Second: >> >> new FileReader("JSON_PATH") >> >> >> That will result in java.io.FileReader looking for a file named >> "JSON_PATH" in the current working folder. Unless that is your intent, you >> need to remove the quotes and make sure it's pointing to a real path on the >> file system. >> >> On Tue, Jul 23, 2019 at 10:13 PM Farooq Mustafa <[email protected]> >> wrote: >> >>> Hello, >>> >>> >>> I am writing my first custom processor. I followed the example here >>> https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/ >>> >>> >>> In the settings, I have /tmp/jsoniput/sample1.json file >>> >>> >>> The source code; >>> >>> public void onTrigger(final ProcessContext context, final ProcessSession >>> session) throws ProcessException { >>> >>> final AtomicReference<String> value = new AtomicReference<>(); >>> getLogger().warn("sample - Farooq - 1 , value = <" + value.toString() + >>> ">"); >>> >>> FlowFile flowfile = session.get(); >>> getLogger().warn("sample - Farooq - 2"); >>> >>> if ( null == flowfile) { >>> getLogger().warn("sample - Farooq - 2.1 - flowfile is NULL *** "); >>> flowfile = session.create(); >>> flowfile = session.get(); >>> getLogger().warn("sample - Farooq - 2.2 - session.create() / >>> session.get() --- NOT SURE "); >>> } >>> getLogger().warn("sample - Farooq - 2.3 JSON_PATH :" + >>> context.getProperty(JSON_PATH).getValue()); >>> >>> session.read(flowfile, new InputStreamCallback() { >>> @Override >>> public void process(InputStream in) throws IOException { >>> >>> try { >>> getLogger().warn("sample - Farooq - 3 JSON_PATH :" + >>> context.getProperty(JSON_PATH).getValue()); >>> Object obj = new JSONParser().parse(new FileReader("JSON_PATH")); >>> getLogger().warn("sample - Farooq - 3.1 "); >>> >>> >>> >>> The log output: >>> >>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >>> o.a.nifi.processors.sitc.JsonProcessor >>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 1 >>> , value = <null> >>> >>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >>> o.a.nifi.processors.sitc.JsonProcessor >>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2 >>> >>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >>> o.a.nifi.processors.sitc.JsonProcessor >>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - >>> 2.1 - flowfile is NULL *** >>> >>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >>> o.a.nifi.processors.sitc.JsonProcessor >>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - >>> 2.2 - session.create() / session.get() --- NOT SURE >>> >>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >>> o.a.nifi.processors.sitc.JsonProcessor >>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - >>> 2.3 JSON_PATH :/tmp/jsoninput >>> >>> 2019-07-20 13:58:42,626 ERROR [Timer-Driven Process Thread-4] >>> o.a.nifi.processors.sitc.JsonProcessor >>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] >>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] failed to process >>> session due to java.lang.NullPointerException; Processor Administratively >>> Yielded for 1 sec: java.lang.NullPointerException >>> >>> java.lang.NullPointerException: null >>> >>> at >>> org.apache.nifi.controller.repository.StandardProcessSession.getRecord(StandardProcessSession.java:574) >>> >>> at >>> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3132) >>> >>> at >>> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2187) >>> >>> >>> And the basic flow: >>> [image: image.png] >>> >>> >>> >>> Any ideas or guidance will be appreciated. >>> >>> >>> Thanks >>> >>> Farooq >>> >>
