Adding API JARs to the scripting NAR is a good idea since it extends the capabilities as you have shown. Mind writing an improvement Jira to capture this?
Thanks, Matt > On Jul 17, 2016, at 4:51 AM, Sumanth Chinthagunta <[email protected]> wrote: > > I had to custom build NiFi to add distributed-cache support for scripting > processors as described here: > https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional > > <https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional> > Please consider adding this support in next release. > > -Sumo > >> On Jul 16, 2016, at 10:17 PM, Sumanth Chinthagunta <[email protected]> wrote: >> >> Hi Matt, >> >> Did you find any solution to use DistributedMapCacheClientService from >> Scripting processors ? >> >> I tried to use module directory with nifi-sumo-common-0.7.0-SNAPSHOT-all.jar >> bundling with org.apache.nifi:nifi-distributed-cache-client-service-api >> dependency here: >> https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0 >> <https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0> >> >> but getting weird error :( >> >> Would be nice if this dependency is bundled with scripting processor Nar in >> NiFi 0.7.1 :) >> >> <dependency> >> <groupId>org.apache.nifi</groupId> >> <artifactId>nifi-distributed-cache-client-service-api</artifactId> >> </dependency> >> >> My Groovy script: >> >> import org.apache.nifi.controller.ControllerService >> import com.crossbusiness.nifi.processors.StringSerDe >> >> final StringSerDe stringSerDe = new StringSerDe(); >> >> def lookup = context.controllerServiceLookup >> def cacheServiceName = DistributedMapCacheClientServiceName.value >> >> log.error "cacheServiceName: ${cacheServiceName}" >> >> def cacheServiceId = >> lookup.getControllerServiceIdentifiers(ControllerService).find { >> cs -> lookup.getControllerServiceName(cs) == cacheServiceName >> } >> >> log.error "cacheServiceId: ${cacheServiceId}" >> >> def cache = lookup.getControllerService(cacheServiceId) >> log.error cache.get("aaa", stringSerDe, stringSerDe ) >> Error: >> >> 00:02:04 CDT >> ERROR >> 3886ddbc-1ccd-437e-8e34-f5a98602264b >> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceName: >> DistributedMapCacheClientService >> 00:02:04 CDT >> ERROR >> 3886ddbc-1ccd-437e-8e34-f5a98602264b >> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceId: >> 8971c8e8-6bc5-4e07-8e30-7189fa8a5252 >> 00:02:04 CDT >> ERROR >> 3886ddbc-1ccd-437e-8e34-f5a98602264b >> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] >> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] failed to process due >> to org.apache.nifi.processor.exception.ProcessException: >> javax.script.ScriptException: javax.script.ScriptException: >> groovy.lang.MissingMethodException: No signature of method: >> com.sun.proxy.$Proxy123.get() is applicable for argument types: >> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, >> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, >> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b >> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] >> Possible solutions: grep(), get(java.lang.Object, >> org.apache.nifi.distributed.cache.client.Serializer, >> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), >> getAt(java.lang.String), every(); rolling back session: >> org.apache.nifi.processor.exception.ProcessException: >> javax.script.ScriptException: javax.script.ScriptException: >> groovy.lang.MissingMethodException: No signature of method: >> com.sun.proxy.$Proxy123.get() is applicable for argument types: >> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, >> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, >> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b >> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] >> Possible solutions: grep(), get(java.lang.Object, >> org.apache.nifi.distributed.cache.client.Serializer, >> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), >> getAt(java.lang.String), every() >> 00:02:04 CDT >> ERROR >> 3886ddbc-1ccd-437e-8e34-f5a98602264b >> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] Failed to process >> session due to org.apache.nifi.processor.exception.ProcessException: >> javax.script.ScriptException: javax.script.ScriptException: >> groovy.lang.MissingMethodException: No signature of method: >> com.sun.proxy.$Proxy123.get() is applicable for argument types: >> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, >> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, >> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b >> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] >> Possible solutions: grep(), get(java.lang.Object, >> org.apache.nifi.distributed.cache.client.Serializer, >> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), >> getAt(java.lang.String), every(): >> org.apache.nifi.processor.exception.ProcessException: >> javax.script.ScriptException: javax.script.ScriptException: >> groovy.lang.MissingMethodException: No signature of method: >> com.sun.proxy.$Proxy123.get() is applicable for argument types: >> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, >> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, >> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b >> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] >> Possible solutions: grep(), get(java.lang.Object, >> org.apache.nifi.distributed.cache.client.Serializer, >> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), >> getAt(java.lang.String), every() >> >>> On Jul 14, 2016, at 4:57 AM, Matt Burgess <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Sumo, >>> >>> That package won't be available to scripts because it is not available >>> to ExecuteScript, that's what I meant about not being able to refer to >>> the class directly :) Instead you can get a reference to the >>> ControllerService by name, then just call methods even though you >>> aren't directly referencing the interface that defines them (thanks >>> Groovy!) >>> >>> However this approach may only work if the methods you're calling >>> don't require classes you don't have access to. In this case you might >>> not be able to use this approach as the get() and put() methods >>> require a Serializer/Deserializer which are in the same package/NAR as >>> the DistributedMapCacheClient. >>> >>> I'll give it a try myself to see if I can find a way (using closures >>> -- but not interface coercion since I can't reference the class) to >>> achieve this, but otherwise you can use the first blog post from my >>> last email to write a simple client of your own inside Groovy. >>> >>> Regards, >>> Matt >>> >>>> On Thu, Jul 14, 2016 at 1:04 AM, Sumanth Chinthagunta <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> Realized I also need to setup DistributedMapCacheClientService along with >>>> DistributedMapCacheServer. [ wonder how many instances service will be >>>> running in clustered env) >>>> >>>> but still looks for guidelines on how to make >>>> org.apache.nifi.distributed.cache.* available for ExecuteScript processor >>>> ! >>>> Thanks >>>> Sumo >>>> >>>> >>>>> On Jul 13, 2016, at 9:12 PM, Sumanth Chinthagunta <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> >>>>> Matt, >>>>> I setup DistributedMapCacheServer controller service and trying to run >>>>> following script. Am I doing correct? >>>>> I have couple of issues: >>>>> 1. How do I make org.apache.nifi.distributed.cache.* package available >>>>> for ExecuteScript? >>>>> 2. When I try : cache.get("1",null,null), getting following error >>>>> Failed to process session due to >>>>> org.apache.nifi.processor.exception.ProcessException: >>>>> javax.script.ScriptException: javax.script.ScriptException: >>>>> groovy.lang.MissingMethodException: No signature of method: >>>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: >>>>> (java.lang.String, null, null) values: [1, null, null] >>>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), >>>>> dump(), find(): org.apache.nifi.processor.exception.ProcessException: >>>>> javax.script.ScriptException: javax.script.ScriptException: >>>>> groovy.lang.MissingMethodException: No signature of method: >>>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: >>>>> (java.lang.String, null, null) values: [1, null, null] >>>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), >>>>> dump(), find() >>>>> >>>>> >>>>> >>>>> >>>>> import org.apache.nifi.controller.ControllerService >>>>> import org.apache.nifi.distributed.cache.client.Deserializer; >>>>> import org.apache.nifi.distributed.cache.client.Serializer; >>>>> import >>>>> org.apache.nifi.distributed.cache.client.exception.DeserializationException; >>>>> import >>>>> org.apache.nifi.distributed.cache.client.exception.SerializationException; >>>>> >>>>> static class StringSerializer implements Serializer<String> { >>>>> >>>>> @Override >>>>> public void serialize(final String value, final OutputStream out) >>>>> throws SerializationException, IOException { >>>>> out.write(value.getBytes(StandardCharsets.UTF_8)); >>>>> } >>>>> } >>>>> >>>>> static class CacheValueDeserializer implements Deserializer<byte[]> { >>>>> >>>>> @Override >>>>> public byte[] deserialize(final byte[] input) throws >>>>> DeserializationException, IOException { >>>>> if (input == null || input.length == 0) { >>>>> return null; >>>>> } >>>>> return input; >>>>> } >>>>> } >>>>> >>>>> private final Serializer<String> keySerializer = new StringSerializer(); >>>>> private final Deserializer<byte[]> valueDeserializer = new >>>>> CacheValueDeserializer(); >>>>> >>>>> def lookup = context.controllerServiceLookup >>>>> def cacheServerName = distributedMapCacheServerName.value >>>>> >>>>> def cacheServerId = >>>>> lookup.getControllerServiceIdentifiers(ControllerService).find { >>>>> cs -> lookup.getControllerServiceName(cs) == cacheServerName >>>>> } >>>>> >>>>> def cache = lookup.getControllerService(cacheServerId) >>>>> >>>>> //log.error cache.get("1",keySerializer,valueDeserializer) >>>>> log.error cache.get("1",null,null) >>>>> >>>>> Thanks >>>>> Sumo >>>>> >>>>> >>>>> >>>>> >>>>>> On Jul 13, 2016, at 6:13 PM, Matt Burgess <[email protected] >>>>>> <mailto:[email protected]> <mailto:[email protected] >>>>>> <mailto:[email protected]>>> wrote: >>>>>> >>>>>> Sumo, >>>>>> >>>>>> I have some example code at >>>>>> http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html >>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html> >>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html >>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html>>. >>>>>> Although it's not expressly for ExecuteScript it should be pretty >>>>>> usable as-is. >>>>>> >>>>>> Also you might be able to use the technique outlined in my other post: >>>>>> http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html >>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html> >>>>>> >>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html >>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html>>. >>>>>> With this you would get a reference to the ControllerService for the >>>>>> DistributedMapCacheClient (although you likely won't be able to refer >>>>>> to the DistributedMapCacheClient class), but using dynamic method >>>>>> invocation (as outlined in the blog post) you can call its methods to >>>>>> get and put values. >>>>>> >>>>>> Regards, >>>>>> Matt >>>>>> >>>>>>> On Wed, Jul 13, 2016 at 8:26 PM, Sumanth Chinthagunta >>>>>>> <[email protected] <mailto:[email protected]> <mailto:[email protected] >>>>>>> <mailto:[email protected]>>> wrote: >>>>>>> looking for example script ( for ExecuteScript processor) that uses >>>>>>> DistributedMapCacheClient to put/get key/value pair. >>>>>>> >>>>>>> Thanks >>>>>>> -Sumo >
