Added Jira https://issues.apache.org/jira/browse/NIFI-2299 <https://issues.apache.org/jira/browse/NIFI-2299>
Thanks -Sumo > On Jul 17, 2016, at 4:56 AM, Matt Burgess <[email protected]> wrote: > > Adding API JARs to the scripting NAR is a good idea since it extends the > capabilities as you have shown. Mind writing an improvement Jira to capture > this? > > Thanks, > Matt > > >> On Jul 17, 2016, at 4:51 AM, Sumanth Chinthagunta <[email protected]> wrote: >> >> I had to custom build NiFi to add distributed-cache support for scripting >> processors as described here: >> https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional >> >> <https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional> >> Please consider adding this support in next release. >> >> -Sumo >> >>> On Jul 16, 2016, at 10:17 PM, Sumanth Chinthagunta <[email protected]> >>> wrote: >>> >>> Hi Matt, >>> >>> Did you find any solution to use DistributedMapCacheClientService from >>> Scripting processors ? >>> >>> I tried to use module directory with >>> nifi-sumo-common-0.7.0-SNAPSHOT-all.jar bundling with >>> org.apache.nifi:nifi-distributed-cache-client-service-api dependency here: >>> https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0 >>> <https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0> >>> >>> but getting weird error :( >>> >>> Would be nice if this dependency is bundled with scripting processor Nar in >>> NiFi 0.7.1 :) >>> >>> <dependency> >>> <groupId>org.apache.nifi</groupId> >>> <artifactId>nifi-distributed-cache-client-service-api</artifactId> >>> </dependency> >>> >>> My Groovy script: >>> >>> import org.apache.nifi.controller.ControllerService >>> import com.crossbusiness.nifi.processors.StringSerDe >>> >>> final StringSerDe stringSerDe = new StringSerDe(); >>> >>> def lookup = context.controllerServiceLookup >>> def cacheServiceName = DistributedMapCacheClientServiceName.value >>> >>> log.error "cacheServiceName: ${cacheServiceName}" >>> >>> def cacheServiceId = >>> lookup.getControllerServiceIdentifiers(ControllerService).find { >>> cs -> lookup.getControllerServiceName(cs) == cacheServiceName >>> } >>> >>> log.error "cacheServiceId: ${cacheServiceId}" >>> >>> def cache = lookup.getControllerService(cacheServiceId) >>> log.error cache.get("aaa", stringSerDe, stringSerDe ) >>> Error: >>> >>> 00:02:04 CDT >>> ERROR >>> 3886ddbc-1ccd-437e-8e34-f5a98602264b >>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceName: >>> DistributedMapCacheClientService >>> 00:02:04 CDT >>> ERROR >>> 3886ddbc-1ccd-437e-8e34-f5a98602264b >>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceId: >>> 8971c8e8-6bc5-4e07-8e30-7189fa8a5252 >>> 00:02:04 CDT >>> ERROR >>> 3886ddbc-1ccd-437e-8e34-f5a98602264b >>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] >>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] failed to process >>> due to org.apache.nifi.processor.exception.ProcessException: >>> javax.script.ScriptException: javax.script.ScriptException: >>> groovy.lang.MissingMethodException: No signature of method: >>> com.sun.proxy.$Proxy123.get() is applicable for argument types: >>> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, >>> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, >>> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b >>> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] >>> Possible solutions: grep(), get(java.lang.Object, >>> org.apache.nifi.distributed.cache.client.Serializer, >>> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), >>> getAt(java.lang.String), every(); rolling back session: >>> org.apache.nifi.processor.exception.ProcessException: >>> javax.script.ScriptException: javax.script.ScriptException: >>> groovy.lang.MissingMethodException: No signature of method: >>> com.sun.proxy.$Proxy123.get() is applicable for argument types: >>> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, >>> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, >>> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b >>> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] >>> Possible solutions: grep(), get(java.lang.Object, >>> org.apache.nifi.distributed.cache.client.Serializer, >>> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), >>> getAt(java.lang.String), every() >>> 00:02:04 CDT >>> ERROR >>> 3886ddbc-1ccd-437e-8e34-f5a98602264b >>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] Failed to process >>> session due to org.apache.nifi.processor.exception.ProcessException: >>> javax.script.ScriptException: javax.script.ScriptException: >>> groovy.lang.MissingMethodException: No signature of method: >>> com.sun.proxy.$Proxy123.get() is applicable for argument types: >>> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, >>> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, >>> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b >>> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] >>> Possible solutions: grep(), get(java.lang.Object, >>> org.apache.nifi.distributed.cache.client.Serializer, >>> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), >>> getAt(java.lang.String), every(): >>> org.apache.nifi.processor.exception.ProcessException: >>> javax.script.ScriptException: javax.script.ScriptException: >>> groovy.lang.MissingMethodException: No signature of method: >>> com.sun.proxy.$Proxy123.get() is applicable for argument types: >>> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, >>> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, >>> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b >>> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...] >>> Possible solutions: grep(), get(java.lang.Object, >>> org.apache.nifi.distributed.cache.client.Serializer, >>> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), >>> getAt(java.lang.String), every() >>> >>>> On Jul 14, 2016, at 4:57 AM, Matt Burgess <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> Sumo, >>>> >>>> That package won't be available to scripts because it is not available >>>> to ExecuteScript, that's what I meant about not being able to refer to >>>> the class directly :) Instead you can get a reference to the >>>> ControllerService by name, then just call methods even though you >>>> aren't directly referencing the interface that defines them (thanks >>>> Groovy!) >>>> >>>> However this approach may only work if the methods you're calling >>>> don't require classes you don't have access to. In this case you might >>>> not be able to use this approach as the get() and put() methods >>>> require a Serializer/Deserializer which are in the same package/NAR as >>>> the DistributedMapCacheClient. >>>> >>>> I'll give it a try myself to see if I can find a way (using closures >>>> -- but not interface coercion since I can't reference the class) to >>>> achieve this, but otherwise you can use the first blog post from my >>>> last email to write a simple client of your own inside Groovy. >>>> >>>> Regards, >>>> Matt >>>> >>>>> On Thu, Jul 14, 2016 at 1:04 AM, Sumanth Chinthagunta <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> Realized I also need to setup DistributedMapCacheClientService along with >>>>> DistributedMapCacheServer. [ wonder how many instances service will be >>>>> running in clustered env) >>>>> >>>>> but still looks for guidelines on how to make >>>>> org.apache.nifi.distributed.cache.* available for ExecuteScript >>>>> processor ! >>>>> Thanks >>>>> Sumo >>>>> >>>>> >>>>>> On Jul 13, 2016, at 9:12 PM, Sumanth Chinthagunta <[email protected] >>>>>> <mailto:[email protected]>> wrote: >>>>>> >>>>>> Matt, >>>>>> I setup DistributedMapCacheServer controller service and trying to run >>>>>> following script. Am I doing correct? >>>>>> I have couple of issues: >>>>>> 1. How do I make org.apache.nifi.distributed.cache.* package available >>>>>> for ExecuteScript? >>>>>> 2. When I try : cache.get("1",null,null), getting following error >>>>>> Failed to process session due to >>>>>> org.apache.nifi.processor.exception.ProcessException: >>>>>> javax.script.ScriptException: javax.script.ScriptException: >>>>>> groovy.lang.MissingMethodException: No signature of method: >>>>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: >>>>>> (java.lang.String, null, null) values: [1, null, null] >>>>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), >>>>>> dump(), find(): org.apache.nifi.processor.exception.ProcessException: >>>>>> javax.script.ScriptException: javax.script.ScriptException: >>>>>> groovy.lang.MissingMethodException: No signature of method: >>>>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: >>>>>> (java.lang.String, null, null) values: [1, null, null] >>>>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), >>>>>> dump(), find() >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> import org.apache.nifi.controller.ControllerService >>>>>> import org.apache.nifi.distributed.cache.client.Deserializer; >>>>>> import org.apache.nifi.distributed.cache.client.Serializer; >>>>>> import >>>>>> org.apache.nifi.distributed.cache.client.exception.DeserializationException; >>>>>> import >>>>>> org.apache.nifi.distributed.cache.client.exception.SerializationException; >>>>>> >>>>>> static class StringSerializer implements Serializer<String> { >>>>>> >>>>>> @Override >>>>>> public void serialize(final String value, final OutputStream out) >>>>>> throws SerializationException, IOException { >>>>>> out.write(value.getBytes(StandardCharsets.UTF_8)); >>>>>> } >>>>>> } >>>>>> >>>>>> static class CacheValueDeserializer implements Deserializer<byte[]> { >>>>>> >>>>>> @Override >>>>>> public byte[] deserialize(final byte[] input) throws >>>>>> DeserializationException, IOException { >>>>>> if (input == null || input.length == 0) { >>>>>> return null; >>>>>> } >>>>>> return input; >>>>>> } >>>>>> } >>>>>> >>>>>> private final Serializer<String> keySerializer = new StringSerializer(); >>>>>> private final Deserializer<byte[]> valueDeserializer = new >>>>>> CacheValueDeserializer(); >>>>>> >>>>>> def lookup = context.controllerServiceLookup >>>>>> def cacheServerName = distributedMapCacheServerName.value >>>>>> >>>>>> def cacheServerId = >>>>>> lookup.getControllerServiceIdentifiers(ControllerService).find { >>>>>> cs -> lookup.getControllerServiceName(cs) == cacheServerName >>>>>> } >>>>>> >>>>>> def cache = lookup.getControllerService(cacheServerId) >>>>>> >>>>>> //log.error cache.get("1",keySerializer,valueDeserializer) >>>>>> log.error cache.get("1",null,null) >>>>>> >>>>>> Thanks >>>>>> Sumo >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>> On Jul 13, 2016, at 6:13 PM, Matt Burgess <[email protected] >>>>>>> <mailto:[email protected]> <mailto:[email protected] >>>>>>> <mailto:[email protected]>>> wrote: >>>>>>> >>>>>>> Sumo, >>>>>>> >>>>>>> I have some example code at >>>>>>> http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html >>>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html> >>>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html >>>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html>>. >>>>>>> Although it's not expressly for ExecuteScript it should be pretty >>>>>>> usable as-is. >>>>>>> >>>>>>> Also you might be able to use the technique outlined in my other post: >>>>>>> http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html >>>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html> >>>>>>> >>>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html >>>>>>> >>>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html>>. >>>>>>> With this you would get a reference to the ControllerService for the >>>>>>> DistributedMapCacheClient (although you likely won't be able to refer >>>>>>> to the DistributedMapCacheClient class), but using dynamic method >>>>>>> invocation (as outlined in the blog post) you can call its methods to >>>>>>> get and put values. >>>>>>> >>>>>>> Regards, >>>>>>> Matt >>>>>>> >>>>>>>> On Wed, Jul 13, 2016 at 8:26 PM, Sumanth Chinthagunta >>>>>>>> <[email protected] <mailto:[email protected]> >>>>>>>> <mailto:[email protected] <mailto:[email protected]>>> wrote: >>>>>>>> looking for example script ( for ExecuteScript processor) that uses >>>>>>>> DistributedMapCacheClient to put/get key/value pair. >>>>>>>> >>>>>>>> Thanks >>>>>>>> -Sumo >>
