This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 584323bc81 NIFI-14157 Allow InvokeScriptedProcessor scripts to 
implement OnPrimaryNodeStateChange (#9632)
584323bc81 is described below

commit 584323bc81c5a7bb1d8e156393d9bb6508560cae
Author: Matt Burgess <[email protected]>
AuthorDate: Fri Jan 24 17:53:42 2025 -0500

    NIFI-14157 Allow InvokeScriptedProcessor scripts to implement 
OnPrimaryNodeStateChange (#9632)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/script/InvokeScriptedProcessor.java |  9 +++-
 .../nifi/processors/script/TestInvokeGroovy.java   | 23 ++++++++++
 .../groovy/test_OnPrimaryStateChange.groovy        | 52 ++++++++++++++++++++++
 .../nifi/util/StandardProcessorTestRunner.java     | 10 +++++
 4 files changed, 93 insertions(+), 1 deletion(-)

diff --git 
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
 
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index 74215f6df1..bf7f01e086 100644
--- 
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++ 
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -30,6 +30,8 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.RequiredPermission;
 import org.apache.nifi.components.ValidationContext;
@@ -212,7 +214,6 @@ public class InvokeScriptedProcessor extends 
AbstractSessionFactoryProcessor {
     public void setup(final ProcessContext context) {
         scriptingComponentHelper.setupVariables(context);
         setup();
-
         invokeScriptedProcessorMethod("onScheduled", context);
     }
 
@@ -232,6 +233,12 @@ public class InvokeScriptedProcessor extends 
AbstractSessionFactoryProcessor {
         }
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+
+        invokeScriptedProcessorMethod("onPrimaryNodeStateChange", newState);
+    }
+
     /**
      * Handles changes to this processor's properties. If changes are made to
      * script- or engine-related properties, the script will be reloaded.
diff --git 
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
 
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
index ffb028b3fd..6269cad094 100644
--- 
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
+++ 
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java
@@ -241,6 +241,29 @@ public class TestInvokeGroovy extends BaseScriptTest {
         ff.assertContentEquals("48\n47\n14\n");
     }
 
+    /**
+     * Tests a script that has a Groovy Processor that implements its own 
onPrimaryNodeStateChange
+     */
+    @Test
+    public void testOnPrimaryNodeStateChange() {
+        
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE,
 "Groovy");
+        runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, 
"target/test/resources/groovy/test_OnPrimaryStateChange.groovy");
+        runner.setProperty(ScriptingComponentUtils.MODULES, 
"target/test/resources/groovy");
+        InvokeScriptedProcessor invokeScriptedProcessor = 
((InvokeScriptedProcessor) scriptingComponent);
+        invokeScriptedProcessor.setup(runner.getProcessContext());
+        runner.setIsConfiguredForClustering(true);
+        runner.run(1, false, true);
+        runner.setPrimaryNode(true);
+        runner.clearTransferState();
+        runner.run(1, true, false);
+        runner.assertAllFlowFilesTransferred("success");
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship("success");
+        assertNotNull(flowFiles);
+        assertEquals(1, flowFiles.size());
+        MockFlowFile flowFile = flowFiles.get(0);
+        flowFile.assertAttributeEquals("isPrimaryNode", "true");
+    }
+
     private static class OverrideInvokeScriptedProcessor extends 
InvokeScriptedProcessor {
 
         private int numTimesModifiedCalled = 0;
diff --git 
a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy
 
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy
new file mode 100644
index 0000000000..bb2b82ff6b
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange
+import org.apache.nifi.annotation.notification.PrimaryNodeState
+import org.apache.nifi.processor.AbstractProcessor
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSession
+import org.apache.nifi.processor.Relationship
+
+
+class MyRecordProcessor extends AbstractProcessor {
+
+    def REL_SUCCESS = new 
Relationship.Builder().name("success").description('FlowFiles that were 
successfully processed are routed here').build()
+    def REL_FAILURE = new 
Relationship.Builder().name("failure").description('FlowFiles are routed here 
if an error occurs during processing').build()
+
+    static boolean primaryNode = false
+
+    @OnPrimaryNodeStateChange
+    void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+        primaryNode = true
+    }
+
+    @Override
+    Set<Relationship> getRelationships() {
+        [REL_SUCCESS, REL_FAILURE] as Set<Relationship>
+    }
+
+    @Override
+    void onTrigger(ProcessContext context, ProcessSession session) {
+        def flowFile = session.create()
+        session.putAttribute(flowFile, 'isPrimaryNode', primaryNode.toString())
+        session.transfer(flowFile, REL_SUCCESS)
+    }
+}
+
+processor = new MyRecordProcessor()
\ No newline at end of file
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 7f1d848075..292c7a9e37 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -26,6 +26,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -979,6 +981,14 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void setPrimaryNode(boolean primaryNode) {
+        if (context.isPrimary() != primaryNode) {
+            try {
+                
ReflectionUtils.invokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, 
processor,
+                        primaryNode ? PrimaryNodeState.ELECTED_PRIMARY_NODE : 
PrimaryNodeState.PRIMARY_NODE_REVOKED);
+            } catch (final Exception e) {
+                Assertions.fail("Could not invoke methods annotated with 
@OnPrimaryNodeStateChange annotation due to: " + e);
+            }
+        }
         context.setPrimaryNode(primaryNode);
     }
 

Reply via email to