[ 
https://issues.apache.org/jira/browse/NIFI-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15400796#comment-15400796
 ] 

Joseph Witt commented on NIFI-2299:
-----------------------------------

Here is code to test that it works.  You'll have to replace the ID found below 
with the ID of the controller service you'll make for your processor.  Due to 
the way controller services are scoped to their containing process it appears 
that InvokeScriptedProcessors will need their own instance.  Will create a JIRA 
for that.

{code}
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient

class GenerateFlowFileWithContent implements Processor {

    def REL_SUCCESS = new Relationship.Builder()
            .name('success')
            .description('The flow file with the specified content and/or 
filename was successfully transferred')
            .build();

    def CACHE = new PropertyDescriptor.Builder()
            .name('Map Cache').description('The map cache to use')
            .required(true)
            .identifiesControllerService(DistributedMapCacheClient).build()
    
    @Override
    void initialize(ProcessorInitializationContext context) { }

    @Override
    Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
      try {
        def cacheService = 
context.getControllerService("3d4e295a-0156-1000-0000-0000187a0b8a")
        System.out.println "Cache Service: ${cacheService}"
      } catch(e) {
          throw new ProcessException(e)
      }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return 
null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
        switch(name) {
            case 'Cache': return CACHE
            default: return null
        }
    }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, 
String newValue) { }

    @Override
    List<PropertyDescriptor> getPropertyDescriptors() { return [CACHE] as List }

    @Override
    String getIdentifier() { return 'Cache-InvokeScriptedProcessor' }
    
}

processor = new GenerateFlowFileWithContent()
{code}

> Add standard services API dependency to Scripting Processors   
> ---------------------------------------------------------------
>
>                 Key: NIFI-2299
>                 URL: https://issues.apache.org/jira/browse/NIFI-2299
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 0.7.0, 0.6.1
>            Reporter: sumanth chinthagunta
>            Assignee: Joseph Witt
>              Labels: Scripts, groovy
>             Fix For: 1.0.0, 0.8.0
>
>
> Scripting Processors cannot used Controller Services such as 
> *DistributedMapCacheClientService* etc, as required dependencies are missing 
> for *ExecuteScript* Nar. By adding following dependencies we can open new 
> possibilities for Scripting Processors.  
> Add this dependency to 
> nifi/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/pom.xml
> {code}
> <dependency>
>     <groupId>org.apache.nifi</groupId>
>     <artifactId>nifi-standard-services-api-nar</artifactId>
>     <type>nar</type>
> </dependency>
> {code}
> Add this dependency to 
> nifi/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
> {code}
> <dependency>
>     <groupId>org.apache.nifi</groupId>
>     <artifactId>nifi-distributed-cache-client-service-api</artifactId>
> </dependency>
> {code}
> Then we can create scripting processor like:
> {code}
> import org.apache.nifi.controller.ControllerService
> import com.crossbusiness.nifi.processors.StringSerDe
> final StringSerDe stringSerDe = new StringSerDe();
> def lookup = context.controllerServiceLookup
> def cacheServiceName = DistributedMapCacheClientServiceName.value
> log.error  "cacheServiceName: ${cacheServiceName}"
> def cacheServiceId = 
> lookup.getControllerServiceIdentifiers(ControllerService).find {
>     cs -> lookup.getControllerServiceName(cs) == cacheServiceName
> }
> log.error  "cacheServiceId:  ${cacheServiceId}"
> def cache = lookup.getControllerService(cacheServiceId)
> log.error cache.get("aaa", stringSerDe, stringSerDe )
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to