Good point Matt. Typically its the pattern like this at the beginning
of onTrigger...
if (connection == null) {
synchronize(something) {
If (connection == null) {
// initialize
}
}
}
This way you don't synchronize every call to onTrigger which would be
expensive, but also protect against starting the processor with more
than 1 concurrent task where they both might trying to init the
connection.
On Wed, Feb 27, 2019 at 9:46 AM Matt Burgess <[email protected]> wrote:
>
> +1 to Bryan's comments. If I had to guess I'd say the error is
> probably in the line:
>
> kuduTable = clientService.getKuduClient().openTable(tableName);
>
> That's the only one that does heavy lifting vs standard NiFi API
> calls, and should be in a try/catch for that reason. Also IIRC setting
> up external connections in @OnScheduled is a bit of an anti-pattern as
> any uncaught exceptions cause the processor to not finish its
> scheduling initialization properly. This used to be a problem with
> leaked threads or memory or something (but has since been fixed). I
> think the conventional wisdom is to have an AtomicReference to your
> connection object (kuduTable here) and synchronize the onTrigger
> thread(s) so the "first one in" gets to create the connection object
> and the rest use it. If that's not accurate, hopefully someone who
> knows more about it will correct me and clarify :)
>
> Regards,
> Matt
>
> On Wed, Feb 27, 2019 at 9:32 AM Bryan Bende <[email protected]> wrote:
> >
> > Lets use one list only.
> >
> > I think there should be more to this error, I believe the invocation
> > target exception means another exception was thrown from OnScheduled
> > while the framework tried to execute OnScheduled.
> >
> > Not sure why the real exception isn't visible, but you could always
> > try temporarily wrapping the whole OnScheduled in a try/catch and
> > logging any exception.
> >
> > On Wed, Feb 27, 2019 at 8:24 AM Phillip Grenier <[email protected]> wrote:
> > >
> > > Kumar,
> > >
> > > I would validate you included the nifi-mock in your pom.xml.
> > >
> > > For more information on testing review the developer guide or this post.
> > >
> > > Hope that helps,
> > >
> > > Phillip
> > > https://nifi.rocks
> > >
> > > On Tue, Feb 26, 2019 at 6:04 PM Sandish Kumar HN <[email protected]>
> > > wrote:
> > >>
> > >> Hi,
> > >>
> > >>
> > >> *I'm stuck with below error:*
> > >>
> > >>
> > >> java.lang.AssertionError: Could not invoke methods annotated with
> > >> @OnScheduled annotation due to:
> > >> java.lang.reflect.InvocationTargetException
> > >>
> > >> at
> > >> org.apache.nifi.kudu.TestPutKuduT.testWriteKuduWithDefaults(TestPutKuduT.java:70)
> > >>
> > >>
> > >> [INFO]
> > >>
> > >> [INFO] Results:
> > >>
> > >> [INFO]
> > >>
> > >> [ERROR] Failures:
> > >>
> > >> [ERROR] TestPutKuduT.testWriteKuduWithDefaults:70 Could not invoke
> > >> methods annotated with @OnScheduled annotation due to:
> > >> java.lang.reflect.InvocationTargetException
> > >>
> > >>
> > >> *The test case is here :*
> > >>
> > >> @Test
> > >> public void testWriteKuduWithDefaults() throws InitializationException {
> > >> final TestRunner runner = TestRunners.newTestRunner(PutKudu.class);
> > >> setUpTestRunner(runner);
> > >>
> > >> final MockKuduClientService kuduClient =
> > >> getKuduClientService(runner);
> > >> createRecordReader(100, runner);
> > >>
> > >> final String filename = "testWriteKudu-" +
> > >> System.currentTimeMillis();
> > >> final Map<String,String> flowFileAttributes = new HashMap<>();
> > >> flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
> > >> runner.enqueue("trigger", flowFileAttributes);
> > >> runner.run();
> > >> runner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS);
> > >> // verify the successful flow file has the expected content &
> > >> attributes
> > >> final MockFlowFile mockFlowFile =
> > >> runner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
> > >> mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(),
> > >> filename);
> > >> mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
> > >> mockFlowFile.assertContentEquals("trigger");
> > >> }
> > >>
> > >> *and onSchedule meMethod is here:*
> > >>
> > >> @OnScheduled
> > >>
> > >> public void onScheduled(final ProcessContext context) throws
> > >> IOException, LoginException {
> > >>
> > >> final String tableName =
> > >> context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
> > >> operationType =
> > >> OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
> > >> batchSize =
> > >> context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
> > >> ffbatch =
> > >> context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
> > >> flushMode =
> > >> SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
> > >> getLogger().debug("Setting up Kudu connection...");
> > >> clientService =
> > >> context.getProperty(KUDU_CLIENT_SERVICE).asControllerService(KuduClientService.class);
> > >> kuduTable = clientService.getKuduClient().openTable(tableName);
> > >> getLogger().debug("Kudu connection successfully initialized");
> > >> }
> > >>
> > >> *Can someone one help me to bypass this issue? I tried all optiones
> > >> dicussed here
> > >> http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html
> > >> <http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html>
> > >> *