This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 21e5ebc840 NIFI-7343: Add support for SchemaRegistryService to
scripted components
21e5ebc840 is described below
commit 21e5ebc840a5e5149f83a3de98e41df0ed999d05
Author: Matt Burgess <[email protected]>
AuthorDate: Tue Jul 4 19:54:47 2023 -0400
NIFI-7343: Add support for SchemaRegistryService to scripted components
This closes #7467
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-scripting-nar/pom.xml | 8 ++++++
.../nifi-scripting-processors/pom.xml | 4 +++
.../apache/nifi/record/script/ScriptedReader.java | 33 ++++++++++++++++++++++
.../record/script/ScriptedRecordSetWriter.java | 33 ++++++++++++++++++++++
.../resources/groovy/test_lookup_inline.groovy | 11 +++++++-
.../groovy/test_record_reader_inline.groovy | 12 ++++++++
.../groovy/test_record_writer_inline.groovy | 11 ++++++++
nifi-nar-bundles/nifi-scripting-bundle/pom.xml | 6 ++++
8 files changed, 117 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/pom.xml
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/pom.xml
index 8fb1b39500..bf39986614 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-nar/pom.xml
@@ -50,6 +50,14 @@
<scope>runtime</scope>
</dependency>
+ <!-- nifi-avro-record-utils dependency to include in the NAR for
processor to use at runtime -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <type>jar</type>
+ <scope>runtime</scope>
+ </dependency>
+
<!-- Groovy dependencies to include in the NAR for processors to use
at runtime -->
<dependency>
<groupId>org.codehaus.groovy</groupId>
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
index 716f3f4ed5..0754246b69 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
@@ -34,6 +34,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-metrics</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
index d0932df33c..5056f75329 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
@@ -63,7 +63,40 @@ public class ScriptedReader extends
AbstractScriptedRecordFactory<RecordReaderFa
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
+ synchronized (scriptingComponentHelper.isInitialized) {
+ if (!scriptingComponentHelper.isInitialized.get()) {
+ scriptingComponentHelper.createResources();
+ }
+ }
super.onEnabled(context);
+
+ // Call an non-interface method onEnabled(context), to allow a
scripted LookupService the chance to set up as necessary
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ final Invocable invocable = (Invocable) scriptEngine;
+ if (configurationContext != null) {
+ try {
+ // Get the actual object from the script engine, versus
the proxy stored in lookupService. The object may have additional methods,
+ // where lookupService is a proxied interface
+ final Object obj = scriptEngine.get("reader");
+ if (obj != null) {
+ try {
+ invocable.invokeMethod(obj, "onEnabled", context);
+ } catch (final NoSuchMethodException nsme) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Configured script for
ScriptedReader does not contain an onEnabled() method.");
+ }
+ }
+ } else {
+ throw new ScriptException("No ScriptedReader was
defined by the script.");
+ }
+ } catch (ScriptException se) {
+ throw new ProcessException("Error executing
onEnabled(context) method", se);
+ }
+ }
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
+ }
}
@Override
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
index 1e8a7d31ea..d5df34540a 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
@@ -64,7 +64,40 @@ public class ScriptedRecordSetWriter extends
AbstractScriptedRecordFactory<Recor
@Override
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
+ synchronized (scriptingComponentHelper.isInitialized) {
+ if (!scriptingComponentHelper.isInitialized.get()) {
+ scriptingComponentHelper.createResources();
+ }
+ }
super.onEnabled(context);
+
+ // Call an non-interface method onEnabled(context), to allow a
scripted LookupService the chance to set up as necessary
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ final Invocable invocable = (Invocable) scriptEngine;
+ if (configurationContext != null) {
+ try {
+ // Get the actual object from the script engine, versus
the proxy stored in lookupService. The object may have additional methods,
+ // where lookupService is a proxied interface
+ final Object obj = scriptEngine.get("writer");
+ if (obj != null) {
+ try {
+ invocable.invokeMethod(obj, "onEnabled", context);
+ } catch (final NoSuchMethodException nsme) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Configured script for
ScriptedRecordSetWriter does not contain an onEnabled() method.");
+ }
+ }
+ } else {
+ throw new ScriptException("No ScriptedRecordSetWriter
was defined by the script.");
+ }
+ } catch (ScriptException se) {
+ throw new ProcessException("Error executing
onEnabled(context) method", se);
+ }
+ }
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
+ }
}
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
index 8285b62d4b..0c2acc10b9 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-import java.util.Set
+import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.reporting.InitializationException
@@ -28,6 +28,15 @@ class GroovyLookupService implements LookupService<String> {
'World': 'there'
]
+ ComponentLog logger;
+
+ void setLogger(ComponentLog logger) {
+ this.logger = logger
+ }
+
+ void onEnabled(final ConfigurationContext context) {
+ logger.info("in onEnabled")
+ }
@Override
Optional<String> lookup(Map<String, String> coordinates) {
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
index 783f5202a5..a7667559b6 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
@@ -16,6 +16,7 @@
*/
import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
@@ -27,6 +28,7 @@ import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
+import org.apache.nifi.serialization.SchemaRegistryService
class GroovyRecordReader implements RecordReader {
@@ -57,6 +59,16 @@ class GroovyRecordReader implements RecordReader {
class GroovyRecordReaderFactory extends AbstractControllerService implements
RecordReaderFactory {
+ ComponentLog logger;
+
+ void setLogger(ComponentLog logger) {
+ this.logger = logger
+ }
+
+ void onEnabled(final ConfigurationContext context) {
+ logger.info("in onEnabled")
+ }
+
RecordReader createRecordReader(Map<String, String> variables, InputStream
inputStream, long inputLength, ComponentLog logger) throws
MalformedRecordException, IOException, SchemaNotFoundException {
return new GroovyRecordReader()
}
diff --git
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
index 569fa6839a..99cb1df80a 100644
---
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
+++
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
@@ -18,6 +18,7 @@
import groovy.xml.MarkupBuilder
import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
@@ -98,6 +99,16 @@ class GroovyRecordSetWriter implements RecordSetWriter {
class GroovyRecordSetWriterFactory extends AbstractControllerService
implements RecordSetWriterFactory {
+ ComponentLog logger;
+
+ void setLogger(ComponentLog logger) {
+ this.logger = logger
+ }
+
+ void onEnabled(final ConfigurationContext context) {
+ logger.info("in onEnabled")
+ }
+
@Override
RecordSchema getSchema(Map<String, String> variables, RecordSchema
readSchema) throws SchemaNotFoundException, IOException {
return null
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/pom.xml
b/nifi-nar-bundles/nifi-scripting-bundle/pom.xml
index e264ba6c25..02eee63483 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-scripting-bundle/pom.xml
@@ -71,6 +71,12 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>