[
https://issues.apache.org/jira/browse/NIFI-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15400796#comment-15400796
]
Joseph Witt commented on NIFI-2299:
-----------------------------------
Here is code to test that it works. You'll have to replace the ID found below
with the ID of the controller service you'll make for your processor. Due to
the way controller services are scoped to their containing process it appears
that InvokeScriptedProcessors will need their own instance. Will create a JIRA
for that.
{code}
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
class GenerateFlowFileWithContent implements Processor {
def REL_SUCCESS = new Relationship.Builder()
.name('success')
.description('The flow file with the specified content and/or
filename was successfully transferred')
.build();
def CACHE = new PropertyDescriptor.Builder()
.name('Map Cache').description('The map cache to use')
.required(true)
.identifiesControllerService(DistributedMapCacheClient).build()
@Override
void initialize(ProcessorInitializationContext context) { }
@Override
Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set }
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException {
try {
def cacheService =
context.getControllerService("3d4e295a-0156-1000-0000-0000187a0b8a")
System.out.println "Cache Service: ${cacheService}"
} catch(e) {
throw new ProcessException(e)
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { return
null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'Cache': return CACHE
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue,
String newValue) { }
@Override
List<PropertyDescriptor> getPropertyDescriptors() { return [CACHE] as List }
@Override
String getIdentifier() { return 'Cache-InvokeScriptedProcessor' }
}
processor = new GenerateFlowFileWithContent()
{code}
> Add standard services API dependency to Scripting Processors
> ---------------------------------------------------------------
>
> Key: NIFI-2299
> URL: https://issues.apache.org/jira/browse/NIFI-2299
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Affects Versions: 0.7.0, 0.6.1
> Reporter: sumanth chinthagunta
> Assignee: Joseph Witt
> Labels: Scripts, groovy
> Fix For: 1.0.0, 0.8.0
>
>
> Scripting Processors cannot used Controller Services such as
> *DistributedMapCacheClientService* etc, as required dependencies are missing
> for *ExecuteScript* Nar. By adding following dependencies we can open new
> possibilities for Scripting Processors.
> Add this dependency to
> nifi/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/pom.xml
> {code}
> <dependency>
> <groupId>org.apache.nifi</groupId>
> <artifactId>nifi-standard-services-api-nar</artifactId>
> <type>nar</type>
> </dependency>
> {code}
> Add this dependency to
> nifi/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
> {code}
> <dependency>
> <groupId>org.apache.nifi</groupId>
> <artifactId>nifi-distributed-cache-client-service-api</artifactId>
> </dependency>
> {code}
> Then we can create scripting processor like:
> {code}
> import org.apache.nifi.controller.ControllerService
> import com.crossbusiness.nifi.processors.StringSerDe
> final StringSerDe stringSerDe = new StringSerDe();
> def lookup = context.controllerServiceLookup
> def cacheServiceName = DistributedMapCacheClientServiceName.value
> log.error "cacheServiceName: ${cacheServiceName}"
> def cacheServiceId =
> lookup.getControllerServiceIdentifiers(ControllerService).find {
> cs -> lookup.getControllerServiceName(cs) == cacheServiceName
> }
> log.error "cacheServiceId: ${cacheServiceId}"
> def cache = lookup.getControllerService(cacheServiceId)
> log.error cache.get("aaa", stringSerDe, stringSerDe )
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)