I had to custom build NiFi to add distributed-cache support for scripting processors as described here: https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional <https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional> Please consider adding this support in next release.
-Sumo > On Jul 16, 2016, at 10:17 PM, Sumanth Chinthagunta <[email protected]> wrote: > > Hi Matt, > > Did you find any solution to use DistributedMapCacheClientService from > Scripting processors ? > > I tried to use module directory with nifi-sumo-common-0.7.0-SNAPSHOT-all.jar > bundling with org.apache.nifi:nifi-distributed-cache-client-service-api > dependency here: > https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0 > <https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0> > > but getting weird error :( > > Would be nice if this dependency is bundled with scripting processor Nar in > NiFi 0.7.1 :) > > <dependency> > <groupId>org.apache.nifi</groupId> > <artifactId>nifi-distributed-cache-client-service-api</artifactId> > </dependency> > > My Groovy script: > > import org.apache.nifi.controller.ControllerService > import com.crossbusiness.nifi.processors.StringSerDe > > final StringSerDe stringSerDe = new StringSerDe(); > > def lookup = context.controllerServiceLookup > def cacheServiceName = DistributedMapCacheClientServiceName.value > > log.error "cacheServiceName: ${cacheServiceName}" > > def cacheServiceId = > lookup.getControllerServiceIdentifiers(ControllerService).find { > cs -> lookup.getControllerServiceName(cs) == cacheServiceName > } > > log.error "cacheServiceId: ${cacheServiceId}" > > def cache = lookup.getControllerService(cacheServiceId) > log.error cache.get("aaa", stringSerDe, stringSerDe ) > Error: > > 00:02:04 CDT > ERROR > 3886ddbc-1ccd-437e-8e34-f5a98602264b > ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceName: > DistributedMapCacheClientService > 00:02:04 CDT > ERROR > 3886ddbc-1ccd-437e-8e34-f5a98602264b > ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceId: > 8971c8e8-6bc5-4e07-8e30-7189fa8a5252 > 00:02:04 CDT > ERROR > 3886ddbc-1ccd-437e-8e34-f5a98602264b > ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] > ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] failed to process due > to org.apache.nifi.processor.exception.ProcessException: > javax.script.ScriptException: javax.script.ScriptException: > groovy.lang.MissingMethodException: No signature of method: > com.sun.proxy.$Proxy123.get() is applicable for argument types: > (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, > com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, > com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b > <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] > Possible solutions: grep(), get(java.lang.Object, > org.apache.nifi.distributed.cache.client.Serializer, > org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), > getAt(java.lang.String), every(); rolling back session: > org.apache.nifi.processor.exception.ProcessException: > javax.script.ScriptException: javax.script.ScriptException: > groovy.lang.MissingMethodException: No signature of method: > com.sun.proxy.$Proxy123.get() is applicable for argument types: > (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, > com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, > com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b > <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] > Possible solutions: grep(), get(java.lang.Object, > org.apache.nifi.distributed.cache.client.Serializer, > org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), > getAt(java.lang.String), every() > 00:02:04 CDT > ERROR > 3886ddbc-1ccd-437e-8e34-f5a98602264b > ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] Failed to process > session due to org.apache.nifi.processor.exception.ProcessException: > javax.script.ScriptException: javax.script.ScriptException: > groovy.lang.MissingMethodException: No signature of method: > com.sun.proxy.$Proxy123.get() is applicable for argument types: > (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, > com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, > com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b > <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] > Possible solutions: grep(), get(java.lang.Object, > org.apache.nifi.distributed.cache.client.Serializer, > org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), > getAt(java.lang.String), every(): > org.apache.nifi.processor.exception.ProcessException: > javax.script.ScriptException: javax.script.ScriptException: > groovy.lang.MissingMethodException: No signature of method: > com.sun.proxy.$Proxy123.get() is applicable for argument types: > (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, > com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, > com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b > <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] > Possible solutions: grep(), get(java.lang.Object, > org.apache.nifi.distributed.cache.client.Serializer, > org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), > getAt(java.lang.String), every() > >> On Jul 14, 2016, at 4:57 AM, Matt Burgess <[email protected] >> <mailto:[email protected]>> wrote: >> >> Sumo, >> >> That package won't be available to scripts because it is not available >> to ExecuteScript, that's what I meant about not being able to refer to >> the class directly :) Instead you can get a reference to the >> ControllerService by name, then just call methods even though you >> aren't directly referencing the interface that defines them (thanks >> Groovy!) >> >> However this approach may only work if the methods you're calling >> don't require classes you don't have access to. In this case you might >> not be able to use this approach as the get() and put() methods >> require a Serializer/Deserializer which are in the same package/NAR as >> the DistributedMapCacheClient. >> >> I'll give it a try myself to see if I can find a way (using closures >> -- but not interface coercion since I can't reference the class) to >> achieve this, but otherwise you can use the first blog post from my >> last email to write a simple client of your own inside Groovy. >> >> Regards, >> Matt >> >> On Thu, Jul 14, 2016 at 1:04 AM, Sumanth Chinthagunta <[email protected] >> <mailto:[email protected]>> wrote: >>> Realized I also need to setup DistributedMapCacheClientService along with >>> DistributedMapCacheServer. [ wonder how many instances service will be >>> running in clustered env) >>> >>> but still looks for guidelines on how to make >>> org.apache.nifi.distributed.cache.* available for ExecuteScript processor ! >>> Thanks >>> Sumo >>> >>> >>>> On Jul 13, 2016, at 9:12 PM, Sumanth Chinthagunta <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> Matt, >>>> I setup DistributedMapCacheServer controller service and trying to run >>>> following script. Am I doing correct? >>>> I have couple of issues: >>>> 1. How do I make org.apache.nifi.distributed.cache.* package available >>>> for ExecuteScript? >>>> 2. When I try : cache.get("1",null,null), getting following error >>>> Failed to process session due to >>>> org.apache.nifi.processor.exception.ProcessException: >>>> javax.script.ScriptException: javax.script.ScriptException: >>>> groovy.lang.MissingMethodException: No signature of method: >>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: >>>> (java.lang.String, null, null) values: [1, null, null] >>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), >>>> dump(), find(): org.apache.nifi.processor.exception.ProcessException: >>>> javax.script.ScriptException: javax.script.ScriptException: >>>> groovy.lang.MissingMethodException: No signature of method: >>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: >>>> (java.lang.String, null, null) values: [1, null, null] >>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), >>>> dump(), find() >>>> >>>> >>>> >>>> >>>> import org.apache.nifi.controller.ControllerService >>>> import org.apache.nifi.distributed.cache.client.Deserializer; >>>> import org.apache.nifi.distributed.cache.client.Serializer; >>>> import >>>> org.apache.nifi.distributed.cache.client.exception.DeserializationException; >>>> import >>>> org.apache.nifi.distributed.cache.client.exception.SerializationException; >>>> >>>> static class StringSerializer implements Serializer<String> { >>>> >>>> @Override >>>> public void serialize(final String value, final OutputStream out) >>>> throws SerializationException, IOException { >>>> out.write(value.getBytes(StandardCharsets.UTF_8)); >>>> } >>>> } >>>> >>>> static class CacheValueDeserializer implements Deserializer<byte[]> { >>>> >>>> @Override >>>> public byte[] deserialize(final byte[] input) throws >>>> DeserializationException, IOException { >>>> if (input == null || input.length == 0) { >>>> return null; >>>> } >>>> return input; >>>> } >>>> } >>>> >>>> private final Serializer<String> keySerializer = new StringSerializer(); >>>> private final Deserializer<byte[]> valueDeserializer = new >>>> CacheValueDeserializer(); >>>> >>>> def lookup = context.controllerServiceLookup >>>> def cacheServerName = distributedMapCacheServerName.value >>>> >>>> def cacheServerId = >>>> lookup.getControllerServiceIdentifiers(ControllerService).find { >>>> cs -> lookup.getControllerServiceName(cs) == cacheServerName >>>> } >>>> >>>> def cache = lookup.getControllerService(cacheServerId) >>>> >>>> //log.error cache.get("1",keySerializer,valueDeserializer) >>>> log.error cache.get("1",null,null) >>>> >>>> Thanks >>>> Sumo >>>> >>>> >>>> >>>> >>>>> On Jul 13, 2016, at 6:13 PM, Matt Burgess <[email protected] >>>>> <mailto:[email protected]> <mailto:[email protected] >>>>> <mailto:[email protected]>>> wrote: >>>>> >>>>> Sumo, >>>>> >>>>> I have some example code at >>>>> http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html >>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html> >>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html >>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html>>. >>>>> Although it's not expressly for ExecuteScript it should be pretty >>>>> usable as-is. >>>>> >>>>> Also you might be able to use the technique outlined in my other post: >>>>> http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html >>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html> >>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html >>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html>>. >>>>> With this you would get a reference to the ControllerService for the >>>>> DistributedMapCacheClient (although you likely won't be able to refer >>>>> to the DistributedMapCacheClient class), but using dynamic method >>>>> invocation (as outlined in the blog post) you can call its methods to >>>>> get and put values. >>>>> >>>>> Regards, >>>>> Matt >>>>> >>>>> On Wed, Jul 13, 2016 at 8:26 PM, Sumanth Chinthagunta <[email protected] >>>>> <mailto:[email protected]> <mailto:[email protected] >>>>> <mailto:[email protected]>>> wrote: >>>>>> looking for example script ( for ExecuteScript processor) that uses >>>>>> DistributedMapCacheClient to put/get key/value pair. >>>>>> >>>>>> Thanks >>>>>> -Sumo >
