This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 584323bc81 NIFI-14157 Allow InvokeScriptedProcessor scripts to
implement OnPrimaryNodeStateChange (#9632)
584323bc81 is described below
commit 584323bc81c5a7bb1d8e156393d9bb6508560cae
Author: Matt Burgess <[email protected]>
AuthorDate: Fri Jan 24 17:53:42 2025 -0500
NIFI-14157 Allow InvokeScriptedProcessor scripts to implement
OnPrimaryNodeStateChange (#9632)
Signed-off-by: David Handermann <[email protected]>
---
.../processors/script/InvokeScriptedProcessor.java | 9 +++-
.../nifi/processors/script/TestInvokeGroovy.java | 23 ++++++++++
.../groovy/test_OnPrimaryStateChange.groovy | 52 ++++++++++++++++++++++
.../nifi/util/StandardProcessorTestRunner.java | 10 +++++
4 files changed, 93 insertions(+), 1 deletion(-)
diff --git
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index 74215f6df1..bf7f01e086 100644
---
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -30,6 +30,8 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
@@ -212,7 +214,6 @@ public class InvokeScriptedProcessor extends
AbstractSessionFactoryProcessor {
public void setup(final ProcessContext context) {
scriptingComponentHelper.setupVariables(context);
setup();
-
invokeScriptedProcessorMethod("onScheduled", context);
}
@@ -232,6 +233,12 @@ public class InvokeScriptedProcessor extends
AbstractSessionFactoryProcessor {
}
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+
+ invokeScriptedProcessorMethod("onPrimaryNodeStateChange", newState);
+ }
+
/**
* Handles changes to this processor's properties. If changes are made to
* script- or engine-related properties, the script will be reloaded.
diff --git
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
index ffb028b3fd..6269cad094 100644
---
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
+++
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
@@ -241,6 +241,29 @@ public class TestInvokeGroovy extends BaseScriptTest {
ff.assertContentEquals("48\n47\n14\n");
}
+ /**
+ * Tests a script that has a Groovy Processor that implements its own
onPrimaryNodeStateChange
+ */
+ @Test
+ public void testOnPrimaryNodeStateChange() {
+
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE,
"Groovy");
+ runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE,
"target/test/resources/groovy/test_OnPrimaryStateChange.groovy");
+ runner.setProperty(ScriptingComponentUtils.MODULES,
"target/test/resources/groovy");
+ InvokeScriptedProcessor invokeScriptedProcessor =
((InvokeScriptedProcessor) scriptingComponent);
+ invokeScriptedProcessor.setup(runner.getProcessContext());
+ runner.setIsConfiguredForClustering(true);
+ runner.run(1, false, true);
+ runner.setPrimaryNode(true);
+ runner.clearTransferState();
+ runner.run(1, true, false);
+ runner.assertAllFlowFilesTransferred("success");
+ List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship("success");
+ assertNotNull(flowFiles);
+ assertEquals(1, flowFiles.size());
+ MockFlowFile flowFile = flowFiles.get(0);
+ flowFile.assertAttributeEquals("isPrimaryNode", "true");
+ }
+
private static class OverrideInvokeScriptedProcessor extends
InvokeScriptedProcessor {
private int numTimesModifiedCalled = 0;
diff --git
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy
new file mode 100644
index 0000000000..bb2b82ff6b
--- /dev/null
+++
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange
+import org.apache.nifi.annotation.notification.PrimaryNodeState
+import org.apache.nifi.processor.AbstractProcessor
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSession
+import org.apache.nifi.processor.Relationship
+
+
+class MyRecordProcessor extends AbstractProcessor {
+
+ def REL_SUCCESS = new
Relationship.Builder().name("success").description('FlowFiles that were
successfully processed are routed here').build()
+ def REL_FAILURE = new
Relationship.Builder().name("failure").description('FlowFiles are routed here
if an error occurs during processing').build()
+
+ static boolean primaryNode = false
+
+ @OnPrimaryNodeStateChange
+ void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+ primaryNode = true
+ }
+
+ @Override
+ Set<Relationship> getRelationships() {
+ [REL_SUCCESS, REL_FAILURE] as Set<Relationship>
+ }
+
+ @Override
+ void onTrigger(ProcessContext context, ProcessSession session) {
+ def flowFile = session.create()
+ session.putAttribute(flowFile, 'isPrimaryNode', primaryNode.toString())
+ session.transfer(flowFile, REL_SUCCESS)
+ }
+}
+
+processor = new MyRecordProcessor()
\ No newline at end of file
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 7f1d848075..292c7a9e37 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -26,6 +26,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -979,6 +981,14 @@ public class StandardProcessorTestRunner implements
TestRunner {
@Override
public void setPrimaryNode(boolean primaryNode) {
+ if (context.isPrimary() != primaryNode) {
+ try {
+
ReflectionUtils.invokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class,
processor,
+ primaryNode ? PrimaryNodeState.ELECTED_PRIMARY_NODE :
PrimaryNodeState.PRIMARY_NODE_REVOKED);
+ } catch (final Exception e) {
+ Assertions.fail("Could not invoke methods annotated with
@OnPrimaryNodeStateChange annotation due to: " + e);
+ }
+ }
context.setPrimaryNode(primaryNode);
}