Repository: nifi Updated Branches: refs/heads/master 1e56de952 -> 77ab5d368
http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/compress.py ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/compress.py b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/compress.py new file mode 100755 index 0000000..51409a9 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/compress.py @@ -0,0 +1,49 @@ +#! /usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import traceback +import bz2 +from jarray import zeros +from org.python.core.util import StringUtil +from org.apache.nifi.processor.io import StreamCallback + +class Compress(StreamCallback) : + __line = None; + + def __init__(self) : + pass + + def process(self, input, output) : + try : + comp = bz2.BZ2Compressor() + buf = zeros(8192, "b") + while True : + bytes_read = input.read(buf) + if(bytes_read == -1) : + break + output.write(comp.compress(StringUtil.fromBytes(buf, 0, bytes_read))) + output.write(comp.flush()) + except : + print "Exception in Compress:" + print '-' * 60 + traceback.print_exc(file=sys.stdout) + print '-' * 60 + raise http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/decompress.py ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/decompress.py b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/decompress.py new file mode 100755 index 0000000..a45ef92 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/decompress.py @@ -0,0 +1,48 @@ +#! /usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import traceback +import bz2 +from jarray import zeros +from org.python.core.util import StringUtil +from org.apache.nifi.processor.io import StreamCallback + +class Decompress(StreamCallback) : + __line = None; + + def __init__(self) : + pass + + def process(self, input, output) : + try : + comp = bz2.BZ2Decompressor() + buf = zeros(8192, "b") + while True : + bytes_read = input.read(buf) + if(bytes_read == -1) : + break + output.write(comp.decompress(StringUtil.fromBytes(buf, 0, bytes_read))) + except : + print "Exception in Decompress:" + print '-' * 60 + traceback.print_exc(file=sys.stdout) + print '-' * 60 + raise http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/read_first_line.py ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/read_first_line.py b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/read_first_line.py new file mode 100755 index 0000000..a66909b --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/callbacks/read_first_line.py @@ -0,0 +1,50 @@ +#! /usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import traceback +from org.apache.nifi.processor.io import InputStreamCallback +from java.io import BufferedReader, InputStreamReader + +class ReadFirstLine(InputStreamCallback) : + __line = None; + + def __init__(self) : + pass + + def getLine(self) : + return self.__line + + def process(self, input) : + try : + reader = InputStreamReader(input) + bufferedReader = BufferedReader(reader) + self.__line = bufferedReader.readLine() + except : + print "Exception in Reader:" + print '-' * 60 + traceback.print_exc(file=sys.stdout) + print '-' * 60 + raise + finally : + if bufferedReader is not None : + bufferedReader.close() + if reader is not None : + reader.close() http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_compress.py ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_compress.py b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_compress.py new file mode 100755 index 0000000..e69b8fa --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_compress.py @@ -0,0 +1,76 @@ +#! /usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import traceback +from callbacks import Compress, Decompress +from org.apache.nifi.processor import Processor +from org.apache.nifi.processor import Relationship +from org.apache.nifi.components import PropertyDescriptor + +class CompressFlowFile(Processor) : + __rel_success = Relationship.Builder().description("Success").name("success").build() + + def __init__(self) : + pass + + def initialize(self, context) : + pass + + def getRelationships(self) : + return set([self.__rel_success]) + + def validate(self, context) : + pass + + def getPropertyDescriptors(self) : + descriptor = PropertyDescriptor.Builder().name("mode").allowableValues("compress", "decompress").required(True).build() + return [descriptor] + + def onPropertyModified(self, descriptor, newValue, oldValue) : + pass + + def onTrigger(self, context, sessionFactory) : + session = sessionFactory.createSession() + try : + # ensure there is work to do + flowfile = session.get() + if flowfile is None : + return + + if context.getProperty("mode").getValue() == "compress" : + flowfile = session.write(flowfile, Compress()) + else : + flowfile = session.write(flowfile, Decompress()) + + # transfer + session.transfer(flowfile, self.__rel_success) + session.commit() + except : + print sys.exc_info()[0] + print "Exception in TestReader:" + print '-' * 60 + traceback.print_exc(file=sys.stdout) + print '-' * 60 + + session.rollback(true) + raise + +processor = CompressFlowFile() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_invalid.py ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_invalid.py b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_invalid.py new file mode 100755 index 0000000..6d36c8d --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_invalid.py @@ -0,0 +1,48 @@ +#! /usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from org.apache.nifi.processor import Processor +from org.apache.nifi.components import ValidationResult + +class AlwaysInvalid(Processor) : + + def __init__(self) : + pass + + def initialize(self, context) : + pass + + def getRelationships(self) : + pass + + def validate(self, context) : + error = ValidationResult.Builder().subject("Processor Error").valid(False).explanation("Never valid.").build() + return [error] + + def getPropertyDescriptors(self) : + pass + + def onPropertyModified(self, descriptor, newValue, oldValue) : + pass + + def onTrigger(self, context, sessionFactory) : + pass + +processor = AlwaysInvalid() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py new file mode 100755 index 0000000..fad6d37 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py @@ -0,0 +1,74 @@ +#! /usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import traceback +from callbacks import ReadFirstLine +from org.apache.nifi.processor import Processor +from org.apache.nifi.processor import Relationship + +class ReadContentAndStoreAsAttribute(Processor) : + __rel_success = Relationship.Builder().description("Success").name("success").build() + + def __init__(self) : + pass + + def initialize(self, context) : + pass + + def getRelationships(self) : + return set([self.__rel_success]) + + def validate(self, context) : + pass + + def getPropertyDescriptors(self) : + pass + + def onPropertyModified(self, descriptor, newValue, oldValue) : + pass + + def onTrigger(self, context, sessionFactory) : + session = sessionFactory.createSession() + try : + # ensure there is work to do + flowfile = session.get() + if flowfile is None : + return + + reader = ReadFirstLine() + session.read(flowfile, reader); + + # set an attribute + flowfile = session.putAttribute(flowfile, "from-content", reader.getLine()) + # transfer + session.transfer(flowfile, self.__rel_success) + session.commit() + except : + print sys.exc_info()[0] + print "Exception in TestReader:" + print '-' * 60 + traceback.print_exc(file=sys.stdout) + print '-' * 60 + + session.rollback(True) + raise + +processor = ReadContentAndStoreAsAttribute() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py new file mode 100755 index 0000000..a333f7a --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py @@ -0,0 +1,79 @@ +#! /usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import traceback +from org.apache.nifi.processor import Processor +from org.apache.nifi.processor import Relationship +from org.apache.nifi.components import PropertyDescriptor +from org.apache.nifi.processor.util import StandardValidators + +class UpdateAttributes(Processor) : + __rel_success = Relationship.Builder().description("Success").name("success").build() + + def __init__(self) : + pass + + def initialize(self, context) : + pass + + def getRelationships(self) : + return set([self.__rel_success]) + + def validate(self, context) : + pass + + def getPropertyDescriptors(self) : + descriptor = PropertyDescriptor.Builder().name("for-attributes").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() + return [descriptor] + + def onPropertyModified(self, descriptor, newValue, oldValue) : + pass + + def onTrigger(self, context, sessionFactory) : + session = sessionFactory.createSession() + try : + # ensure there is work to do + flowfile = session.get() + if flowfile is None : + return + + # extract some attribute values + fromPropertyValue = context.getProperty("for-attributes").getValue() + fromAttributeValue = flowfile.getAttribute("for-attributes") + + # set an attribute + flowfile = session.putAttribute(flowfile, "from-property", fromPropertyValue) + flowfile = session.putAttribute(flowfile, "from-attribute", fromAttributeValue) + + # transfer + session.transfer(flowfile, self.__rel_success) + session.commit() + except : + print sys.exc_info()[0] + print "Exception in TestReader:" + print '-' * 60 + traceback.print_exc(file=sys.stdout) + print '-' * 60 + + session.rollback(true) + raise + +processor = UpdateAttributes() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/lua/test_onTrigger.lua ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/lua/test_onTrigger.lua b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/lua/test_onTrigger.lua new file mode 100644 index 0000000..687cae1 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/lua/test_onTrigger.lua @@ -0,0 +1,21 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +flowFile = session:get() +if flowFile == nil then + return +end +flowFile = session:putAttribute(flowFile, "from-content", "test content") +session:transfer(flowFile, REL_SUCCESS) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/nifi-scripting-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/pom.xml b/nifi-nar-bundles/nifi-scripting-bundle/pom.xml new file mode 100644 index 0000000..f6763fd --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.4.2-SNAPSHOT</version> + </parent> + + <artifactId>nifi-scripting-bundle</artifactId> + <packaging>pom</packaging> + + <modules> + <module>nifi-scripting-processors</module> + <module>nifi-scripting-nar</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-scripting-processors</artifactId> + <version>0.4.2-SNAPSHOT</version> + </dependency> + </dependencies> + </dependencyManagement> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 3bc915b..646ae1c 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -51,6 +51,7 @@ <module>nifi-ldap-iaa-providers-bundle</module> <module>nifi-riemann-bundle</module> <module>nifi-html-bundle</module> + <module>nifi-scripting-bundle</module> </modules> <dependencyManagement> <dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/77ab5d36/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b00890b..563ecc3 100644 --- a/pom.xml +++ b/pom.xml @@ -583,9 +583,9 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.activemq</groupId> - <artifactId>activemq-broker</artifactId> + <artifactId>activemq-broker</artifactId> <version>5.12.1</version> - <scope>tests</scope> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.lucene</groupId> @@ -1028,6 +1028,12 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-scripting-nar</artifactId> + <version>0.4.2-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-properties</artifactId> <version>0.4.2-SNAPSHOT</version> </dependency> @@ -1409,8 +1415,8 @@ language governing permissions and limitations under the License. --> <profiles> <profile> <!-- Performs execution of Integration Tests using the Maven FailSafe Plugin. The view of integration tests in this context - are those tests interfacing with external sources and services requiring additional resources or credentials that cannot - be explicitly provided. --> + are those tests interfacing with external sources and services requiring additional resources or credentials that cannot + be explicitly provided. --> <id>integration-tests</id> <build> <plugins>
