I had to custom build NiFi to add distributed-cache support for scripting 
processors as described here: 
https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional
 
<https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional>
Please consider adding this support in next release. 

-Sumo 

> On Jul 16, 2016, at 10:17 PM, Sumanth Chinthagunta <[email protected]> wrote:
> 
> Hi Matt,
> 
> Did you find any solution to use DistributedMapCacheClientService from 
> Scripting processors ?
> 
> I tried to use module directory with nifi-sumo-common-0.7.0-SNAPSHOT-all.jar 
> bundling with org.apache.nifi:nifi-distributed-cache-client-service-api 
> dependency here: 
> https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0 
> <https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0>
> 
> but getting weird error :(
> 
> Would be nice if this dependency is bundled with scripting processor Nar in 
> NiFi 0.7.1 :)
> 
>        <dependency>
>             <groupId>org.apache.nifi</groupId>
>             <artifactId>nifi-distributed-cache-client-service-api</artifactId>
>         </dependency>
> 
> My Groovy script:
> 
> import org.apache.nifi.controller.ControllerService
> import com.crossbusiness.nifi.processors.StringSerDe
> 
> final StringSerDe stringSerDe = new StringSerDe();
> 
> def lookup = context.controllerServiceLookup
> def cacheServiceName = DistributedMapCacheClientServiceName.value
> 
> log.error  "cacheServiceName: ${cacheServiceName}"
> 
> def cacheServiceId = 
> lookup.getControllerServiceIdentifiers(ControllerService).find {
>     cs -> lookup.getControllerServiceName(cs) == cacheServiceName
> }
> 
> log.error  "cacheServiceId:  ${cacheServiceId}"
> 
> def cache = lookup.getControllerService(cacheServiceId)
> log.error cache.get("aaa", stringSerDe, stringSerDe )
>  Error: 
>  
> 00:02:04 CDT
> ERROR
> 3886ddbc-1ccd-437e-8e34-f5a98602264b
> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceName: 
> DistributedMapCacheClientService
> 00:02:04 CDT
> ERROR
> 3886ddbc-1ccd-437e-8e34-f5a98602264b
> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceId:  
> 8971c8e8-6bc5-4e07-8e30-7189fa8a5252
> 00:02:04 CDT
> ERROR
> 3886ddbc-1ccd-437e-8e34-f5a98602264b
> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] 
> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] failed to process due 
> to org.apache.nifi.processor.exception.ProcessException: 
> javax.script.ScriptException: javax.script.ScriptException: 
> groovy.lang.MissingMethodException: No signature of method: 
> com.sun.proxy.$Proxy123.get() is applicable for argument types: 
> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, 
> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, 
> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b 
> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...]
> Possible solutions: grep(), get(java.lang.Object, 
> org.apache.nifi.distributed.cache.client.Serializer, 
> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), 
> getAt(java.lang.String), every(); rolling back session: 
> org.apache.nifi.processor.exception.ProcessException: 
> javax.script.ScriptException: javax.script.ScriptException: 
> groovy.lang.MissingMethodException: No signature of method: 
> com.sun.proxy.$Proxy123.get() is applicable for argument types: 
> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, 
> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, 
> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b 
> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...]
> Possible solutions: grep(), get(java.lang.Object, 
> org.apache.nifi.distributed.cache.client.Serializer, 
> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), 
> getAt(java.lang.String), every()
> 00:02:04 CDT
> ERROR
> 3886ddbc-1ccd-437e-8e34-f5a98602264b
> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] Failed to process 
> session due to org.apache.nifi.processor.exception.ProcessException: 
> javax.script.ScriptException: javax.script.ScriptException: 
> groovy.lang.MissingMethodException: No signature of method: 
> com.sun.proxy.$Proxy123.get() is applicable for argument types: 
> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, 
> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, 
> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b 
> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...]
> Possible solutions: grep(), get(java.lang.Object, 
> org.apache.nifi.distributed.cache.client.Serializer, 
> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), 
> getAt(java.lang.String), every(): 
> org.apache.nifi.processor.exception.ProcessException: 
> javax.script.ScriptException: javax.script.ScriptException: 
> groovy.lang.MissingMethodException: No signature of method: 
> com.sun.proxy.$Proxy123.get() is applicable for argument types: 
> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, 
> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, 
> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b 
> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...]
> Possible solutions: grep(), get(java.lang.Object, 
> org.apache.nifi.distributed.cache.client.Serializer, 
> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), 
> getAt(java.lang.String), every()
> 
>> On Jul 14, 2016, at 4:57 AM, Matt Burgess <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Sumo,
>> 
>> That package won't be available to scripts because it is not available
>> to ExecuteScript, that's what I meant about not being able to refer to
>> the class directly :)  Instead you can get a reference to the
>> ControllerService by name, then just call methods even though you
>> aren't directly referencing the interface that defines them (thanks
>> Groovy!)
>> 
>> However this approach may only work if the methods you're calling
>> don't require classes you don't have access to. In this case you might
>> not be able to use this approach as the get() and put() methods
>> require a Serializer/Deserializer which are in the same package/NAR as
>> the DistributedMapCacheClient.
>> 
>> I'll give it a try myself to see if I can find a way (using closures
>> -- but not interface coercion since I can't reference the class) to
>> achieve this, but otherwise you can use the first blog post from my
>> last email to write a simple client of your own inside Groovy.
>> 
>> Regards,
>> Matt
>> 
>> On Thu, Jul 14, 2016 at 1:04 AM, Sumanth Chinthagunta <[email protected] 
>> <mailto:[email protected]>> wrote:
>>> Realized I also need to setup DistributedMapCacheClientService along with 
>>> DistributedMapCacheServer. [ wonder how many instances service will be 
>>> running in clustered env)
>>> 
>>> but still looks for guidelines on how to   make 
>>> org.apache.nifi.distributed.cache.* available  for ExecuteScript processor !
>>> Thanks
>>> Sumo
>>> 
>>> 
>>>> On Jul 13, 2016, at 9:12 PM, Sumanth Chinthagunta <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> 
>>>> Matt,
>>>> I setup DistributedMapCacheServer controller service and trying to run 
>>>> following script. Am I doing correct?
>>>> I have couple of issues:
>>>> 1. How do I make org.apache.nifi.distributed.cache.* package available  
>>>> for ExecuteScript?
>>>> 2. When I try : cache.get("1",null,null), getting following error
>>>> Failed to process session due to 
>>>> org.apache.nifi.processor.exception.ProcessException: 
>>>> javax.script.ScriptException: javax.script.ScriptException: 
>>>> groovy.lang.MissingMethodException: No signature of method: 
>>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: 
>>>> (java.lang.String, null, null) values: [1, null, null]
>>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), 
>>>> dump(), find(): org.apache.nifi.processor.exception.ProcessException: 
>>>> javax.script.ScriptException: javax.script.ScriptException: 
>>>> groovy.lang.MissingMethodException: No signature of method: 
>>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: 
>>>> (java.lang.String, null, null) values: [1, null, null]
>>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), 
>>>> dump(), find()
>>>> 
>>>> 
>>>> 
>>>> 
>>>> import org.apache.nifi.controller.ControllerService
>>>> import org.apache.nifi.distributed.cache.client.Deserializer;
>>>> import org.apache.nifi.distributed.cache.client.Serializer;
>>>> import 
>>>> org.apache.nifi.distributed.cache.client.exception.DeserializationException;
>>>> import 
>>>> org.apache.nifi.distributed.cache.client.exception.SerializationException;
>>>> 
>>>> static class StringSerializer implements Serializer<String> {
>>>> 
>>>>    @Override
>>>>    public void serialize(final String value, final OutputStream out) 
>>>> throws SerializationException, IOException {
>>>>        out.write(value.getBytes(StandardCharsets.UTF_8));
>>>>    }
>>>> }
>>>> 
>>>> static class CacheValueDeserializer implements Deserializer<byte[]> {
>>>> 
>>>>    @Override
>>>>    public byte[] deserialize(final byte[] input) throws 
>>>> DeserializationException, IOException {
>>>>        if (input == null || input.length == 0) {
>>>>            return null;
>>>>        }
>>>>        return input;
>>>>    }
>>>> }
>>>> 
>>>> private final Serializer<String> keySerializer = new StringSerializer();
>>>> private final Deserializer<byte[]> valueDeserializer = new 
>>>> CacheValueDeserializer();
>>>> 
>>>> def lookup = context.controllerServiceLookup
>>>> def cacheServerName = distributedMapCacheServerName.value
>>>> 
>>>> def cacheServerId = 
>>>> lookup.getControllerServiceIdentifiers(ControllerService).find {
>>>>    cs -> lookup.getControllerServiceName(cs) == cacheServerName
>>>> }
>>>> 
>>>> def cache = lookup.getControllerService(cacheServerId)
>>>> 
>>>> //log.error cache.get("1",keySerializer,valueDeserializer)
>>>> log.error cache.get("1",null,null)
>>>> 
>>>> Thanks
>>>> Sumo
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> On Jul 13, 2016, at 6:13 PM, Matt Burgess <[email protected] 
>>>>> <mailto:[email protected]> <mailto:[email protected] 
>>>>> <mailto:[email protected]>>> wrote:
>>>>> 
>>>>> Sumo,
>>>>> 
>>>>> I have some example code at
>>>>> http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html 
>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html> 
>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html 
>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html>>.
>>>>> Although it's not expressly for ExecuteScript it should be pretty
>>>>> usable as-is.
>>>>> 
>>>>> Also you might be able to use the technique outlined in my other post:
>>>>> http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html 
>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html> 
>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html 
>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html>>.
>>>>> With this you would get a reference to the ControllerService for the
>>>>> DistributedMapCacheClient (although you likely won't be able to refer
>>>>> to the DistributedMapCacheClient class), but using dynamic method
>>>>> invocation (as outlined in the blog post) you can call its methods to
>>>>> get and put values.
>>>>> 
>>>>> Regards,
>>>>> Matt
>>>>> 
>>>>> On Wed, Jul 13, 2016 at 8:26 PM, Sumanth Chinthagunta <[email protected] 
>>>>> <mailto:[email protected]> <mailto:[email protected] 
>>>>> <mailto:[email protected]>>> wrote:
>>>>>> looking for example script ( for ExecuteScript processor)  that uses 
>>>>>> DistributedMapCacheClient to put/get key/value pair.
>>>>>> 
>>>>>> Thanks
>>>>>> -Sumo
> 

Reply via email to