Added Jira https://issues.apache.org/jira/browse/NIFI-2299 
<https://issues.apache.org/jira/browse/NIFI-2299>

Thanks 
-Sumo

> On Jul 17, 2016, at 4:56 AM, Matt Burgess <[email protected]> wrote:
> 
> Adding API JARs to the scripting NAR is a good idea since it extends the 
> capabilities as you have shown. Mind writing an improvement Jira to capture 
> this?
> 
> Thanks,
> Matt
> 
> 
>> On Jul 17, 2016, at 4:51 AM, Sumanth Chinthagunta <[email protected]> wrote:
>> 
>> I had to custom build NiFi to add distributed-cache support for scripting 
>> processors as described here: 
>> https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional
>>  
>> <https://github.com/xmlking/mapr-nifi-hadoop-libraries-bundle/blob/master/nifi-mapr-build.md#add-optional>
>> Please consider adding this support in next release. 
>> 
>> -Sumo 
>> 
>>> On Jul 16, 2016, at 10:17 PM, Sumanth Chinthagunta <[email protected]> 
>>> wrote:
>>> 
>>> Hi Matt,
>>> 
>>> Did you find any solution to use DistributedMapCacheClientService from 
>>> Scripting processors ?
>>> 
>>> I tried to use module directory with 
>>> nifi-sumo-common-0.7.0-SNAPSHOT-all.jar bundling with 
>>> org.apache.nifi:nifi-distributed-cache-client-service-api dependency here: 
>>> https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0 
>>> <https://github.com/xmlking/nifi-scripting/releases/tag/0.7.0>
>>> 
>>> but getting weird error :(
>>> 
>>> Would be nice if this dependency is bundled with scripting processor Nar in 
>>> NiFi 0.7.1 :)
>>> 
>>>      <dependency>
>>>           <groupId>org.apache.nifi</groupId>
>>>           <artifactId>nifi-distributed-cache-client-service-api</artifactId>
>>>       </dependency>
>>> 
>>> My Groovy script:
>>> 
>>> import org.apache.nifi.controller.ControllerService
>>> import com.crossbusiness.nifi.processors.StringSerDe
>>> 
>>> final StringSerDe stringSerDe = new StringSerDe();
>>> 
>>> def lookup = context.controllerServiceLookup
>>> def cacheServiceName = DistributedMapCacheClientServiceName.value
>>> 
>>> log.error  "cacheServiceName: ${cacheServiceName}"
>>> 
>>> def cacheServiceId = 
>>> lookup.getControllerServiceIdentifiers(ControllerService).find {
>>>   cs -> lookup.getControllerServiceName(cs) == cacheServiceName
>>> }
>>> 
>>> log.error  "cacheServiceId:  ${cacheServiceId}"
>>> 
>>> def cache = lookup.getControllerService(cacheServiceId)
>>> log.error cache.get("aaa", stringSerDe, stringSerDe )
>>> Error: 
>>> 
>>> 00:02:04 CDT
>>> ERROR
>>> 3886ddbc-1ccd-437e-8e34-f5a98602264b
>>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceName: 
>>> DistributedMapCacheClientService
>>> 00:02:04 CDT
>>> ERROR
>>> 3886ddbc-1ccd-437e-8e34-f5a98602264b
>>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] cacheServiceId:  
>>> 8971c8e8-6bc5-4e07-8e30-7189fa8a5252
>>> 00:02:04 CDT
>>> ERROR
>>> 3886ddbc-1ccd-437e-8e34-f5a98602264b
>>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] 
>>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] failed to process 
>>> due to org.apache.nifi.processor.exception.ProcessException: 
>>> javax.script.ScriptException: javax.script.ScriptException: 
>>> groovy.lang.MissingMethodException: No signature of method: 
>>> com.sun.proxy.$Proxy123.get() is applicable for argument types: 
>>> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, 
>>> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, 
>>> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b 
>>> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...]
>>> Possible solutions: grep(), get(java.lang.Object, 
>>> org.apache.nifi.distributed.cache.client.Serializer, 
>>> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), 
>>> getAt(java.lang.String), every(); rolling back session: 
>>> org.apache.nifi.processor.exception.ProcessException: 
>>> javax.script.ScriptException: javax.script.ScriptException: 
>>> groovy.lang.MissingMethodException: No signature of method: 
>>> com.sun.proxy.$Proxy123.get() is applicable for argument types: 
>>> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, 
>>> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, 
>>> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b 
>>> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...]
>>> Possible solutions: grep(), get(java.lang.Object, 
>>> org.apache.nifi.distributed.cache.client.Serializer, 
>>> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), 
>>> getAt(java.lang.String), every()
>>> 00:02:04 CDT
>>> ERROR
>>> 3886ddbc-1ccd-437e-8e34-f5a98602264b
>>> ExecuteScript[id=3886ddbc-1ccd-437e-8e34-f5a98602264b] Failed to process 
>>> session due to org.apache.nifi.processor.exception.ProcessException: 
>>> javax.script.ScriptException: javax.script.ScriptException: 
>>> groovy.lang.MissingMethodException: No signature of method: 
>>> com.sun.proxy.$Proxy123.get() is applicable for argument types: 
>>> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, 
>>> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, 
>>> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b 
>>> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...]
>>> Possible solutions: grep(), get(java.lang.Object, 
>>> org.apache.nifi.distributed.cache.client.Serializer, 
>>> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), 
>>> getAt(java.lang.String), every(): 
>>> org.apache.nifi.processor.exception.ProcessException: 
>>> javax.script.ScriptException: javax.script.ScriptException: 
>>> groovy.lang.MissingMethodException: No signature of method: 
>>> com.sun.proxy.$Proxy123.get() is applicable for argument types: 
>>> (java.lang.String, com.crossbusiness.nifi.processors.StringSerDe, 
>>> com.crossbusiness.nifi.processors.StringSerDe) values: [aaa, 
>>> com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b 
>>> <mailto:com.crossbusiness.nifi.processors.StringSerDe@5b1ef80b>, ...]
>>> Possible solutions: grep(), get(java.lang.Object, 
>>> org.apache.nifi.distributed.cache.client.Serializer, 
>>> org.apache.nifi.distributed.cache.client.Deserializer), wait(), any(), 
>>> getAt(java.lang.String), every()
>>> 
>>>> On Jul 14, 2016, at 4:57 AM, Matt Burgess <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> 
>>>> Sumo,
>>>> 
>>>> That package won't be available to scripts because it is not available
>>>> to ExecuteScript, that's what I meant about not being able to refer to
>>>> the class directly :)  Instead you can get a reference to the
>>>> ControllerService by name, then just call methods even though you
>>>> aren't directly referencing the interface that defines them (thanks
>>>> Groovy!)
>>>> 
>>>> However this approach may only work if the methods you're calling
>>>> don't require classes you don't have access to. In this case you might
>>>> not be able to use this approach as the get() and put() methods
>>>> require a Serializer/Deserializer which are in the same package/NAR as
>>>> the DistributedMapCacheClient.
>>>> 
>>>> I'll give it a try myself to see if I can find a way (using closures
>>>> -- but not interface coercion since I can't reference the class) to
>>>> achieve this, but otherwise you can use the first blog post from my
>>>> last email to write a simple client of your own inside Groovy.
>>>> 
>>>> Regards,
>>>> Matt
>>>> 
>>>>> On Thu, Jul 14, 2016 at 1:04 AM, Sumanth Chinthagunta <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>> Realized I also need to setup DistributedMapCacheClientService along with 
>>>>> DistributedMapCacheServer. [ wonder how many instances service will be 
>>>>> running in clustered env)
>>>>> 
>>>>> but still looks for guidelines on how to   make 
>>>>> org.apache.nifi.distributed.cache.* available  for ExecuteScript 
>>>>> processor !
>>>>> Thanks
>>>>> Sumo
>>>>> 
>>>>> 
>>>>>> On Jul 13, 2016, at 9:12 PM, Sumanth Chinthagunta <[email protected] 
>>>>>> <mailto:[email protected]>> wrote:
>>>>>> 
>>>>>> Matt,
>>>>>> I setup DistributedMapCacheServer controller service and trying to run 
>>>>>> following script. Am I doing correct?
>>>>>> I have couple of issues:
>>>>>> 1. How do I make org.apache.nifi.distributed.cache.* package available  
>>>>>> for ExecuteScript?
>>>>>> 2. When I try : cache.get("1",null,null), getting following error
>>>>>> Failed to process session due to 
>>>>>> org.apache.nifi.processor.exception.ProcessException: 
>>>>>> javax.script.ScriptException: javax.script.ScriptException: 
>>>>>> groovy.lang.MissingMethodException: No signature of method: 
>>>>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: 
>>>>>> (java.lang.String, null, null) values: [1, null, null]
>>>>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), 
>>>>>> dump(), find(): org.apache.nifi.processor.exception.ProcessException: 
>>>>>> javax.script.ScriptException: javax.script.ScriptException: 
>>>>>> groovy.lang.MissingMethodException: No signature of method: 
>>>>>> com.sun.proxy.$Proxy129.get() is applicable for argument types: 
>>>>>> (java.lang.String, null, null) values: [1, null, null]
>>>>>> Possible solutions: grep(), wait(), any(), getAt(java.lang.String), 
>>>>>> dump(), find()
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> import org.apache.nifi.controller.ControllerService
>>>>>> import org.apache.nifi.distributed.cache.client.Deserializer;
>>>>>> import org.apache.nifi.distributed.cache.client.Serializer;
>>>>>> import 
>>>>>> org.apache.nifi.distributed.cache.client.exception.DeserializationException;
>>>>>> import 
>>>>>> org.apache.nifi.distributed.cache.client.exception.SerializationException;
>>>>>> 
>>>>>> static class StringSerializer implements Serializer<String> {
>>>>>> 
>>>>>>  @Override
>>>>>>  public void serialize(final String value, final OutputStream out) 
>>>>>> throws SerializationException, IOException {
>>>>>>      out.write(value.getBytes(StandardCharsets.UTF_8));
>>>>>>  }
>>>>>> }
>>>>>> 
>>>>>> static class CacheValueDeserializer implements Deserializer<byte[]> {
>>>>>> 
>>>>>>  @Override
>>>>>>  public byte[] deserialize(final byte[] input) throws 
>>>>>> DeserializationException, IOException {
>>>>>>      if (input == null || input.length == 0) {
>>>>>>          return null;
>>>>>>      }
>>>>>>      return input;
>>>>>>  }
>>>>>> }
>>>>>> 
>>>>>> private final Serializer<String> keySerializer = new StringSerializer();
>>>>>> private final Deserializer<byte[]> valueDeserializer = new 
>>>>>> CacheValueDeserializer();
>>>>>> 
>>>>>> def lookup = context.controllerServiceLookup
>>>>>> def cacheServerName = distributedMapCacheServerName.value
>>>>>> 
>>>>>> def cacheServerId = 
>>>>>> lookup.getControllerServiceIdentifiers(ControllerService).find {
>>>>>>  cs -> lookup.getControllerServiceName(cs) == cacheServerName
>>>>>> }
>>>>>> 
>>>>>> def cache = lookup.getControllerService(cacheServerId)
>>>>>> 
>>>>>> //log.error cache.get("1",keySerializer,valueDeserializer)
>>>>>> log.error cache.get("1",null,null)
>>>>>> 
>>>>>> Thanks
>>>>>> Sumo
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Jul 13, 2016, at 6:13 PM, Matt Burgess <[email protected] 
>>>>>>> <mailto:[email protected]> <mailto:[email protected] 
>>>>>>> <mailto:[email protected]>>> wrote:
>>>>>>> 
>>>>>>> Sumo,
>>>>>>> 
>>>>>>> I have some example code at
>>>>>>> http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html 
>>>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html> 
>>>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html 
>>>>>>> <http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html>>.
>>>>>>> Although it's not expressly for ExecuteScript it should be pretty
>>>>>>> usable as-is.
>>>>>>> 
>>>>>>> Also you might be able to use the technique outlined in my other post:
>>>>>>> http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html 
>>>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html>
>>>>>>>  
>>>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html
>>>>>>>  
>>>>>>> <http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html>>.
>>>>>>> With this you would get a reference to the ControllerService for the
>>>>>>> DistributedMapCacheClient (although you likely won't be able to refer
>>>>>>> to the DistributedMapCacheClient class), but using dynamic method
>>>>>>> invocation (as outlined in the blog post) you can call its methods to
>>>>>>> get and put values.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Matt
>>>>>>> 
>>>>>>>> On Wed, Jul 13, 2016 at 8:26 PM, Sumanth Chinthagunta 
>>>>>>>> <[email protected] <mailto:[email protected]> 
>>>>>>>> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>>>>>>>> looking for example script ( for ExecuteScript processor)  that uses 
>>>>>>>> DistributedMapCacheClient to put/get key/value pair.
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> -Sumo
>> 

Reply via email to