Repository: nifi
Updated Branches:
  refs/heads/master dbef69040 -> 0973c2d8d


NIFI-543 Added annotation to restrict processor to run only on the primary node
- PR Fix - 'Execution' dropdown will now be shown in all cases
- Annotated ListGCSBucket with PrimaryNodeOnly

This closes #2509.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0973c2d8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0973c2d8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0973c2d8

Branch: refs/heads/master
Commit: 0973c2d8d13940323ebb30eaa383608bf9e7cbb5
Parents: dbef690
Author: zenfenan <[email protected]>
Authored: Sat Mar 3 21:14:51 2018 +0530
Committer: Mark Payne <[email protected]>
Committed: Fri May 25 11:50:29 2018 -0400

----------------------------------------------------------------------
 .../annotation/behavior/PrimaryNodeOnly.java    | 36 ++++++++++++++++++++
 .../src/main/asciidoc/developer-guide.adoc      |  6 ++++
 .../apache/nifi/processors/aws/s3/ListS3.java   |  2 ++
 .../azure/storage/ListAzureBlobStorage.java     |  2 ++
 .../nifi/web/api/dto/ProcessorConfigDTO.java    |  2 +-
 .../apache/nifi/web/api/dto/ProcessorDTO.java   | 14 ++++++++
 .../apache/nifi/controller/ProcessorNode.java   |  2 ++
 .../nifi/controller/ProcessorDetails.java       |  7 ++++
 .../nifi/controller/StandardProcessorNode.java  | 16 +++++++--
 .../DummyPrimaryNodeOnlyProcessor.java          | 35 +++++++++++++++++++
 .../nifi/controller/TestFlowController.java     | 11 ++++++
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  2 ++
 .../partials/canvas/processor-configuration.jsp |  2 +-
 .../js/nf/canvas/nf-processor-configuration.js  | 13 +++----
 .../main/webapp/js/nf/nf-processor-details.js   | 20 +++++------
 .../processors/gcp/storage/ListGCSBucket.java   |  2 ++
 .../apache/nifi/processors/hadoop/ListHDFS.java |  2 ++
 .../nifi/processors/standard/GetJMSTopic.java   |  2 ++
 .../processors/standard/ListDatabaseTables.java |  2 ++
 .../nifi/processors/standard/ListFTP.java       |  2 ++
 .../nifi/processors/standard/ListSFTP.java      |  2 ++
 21 files changed, 157 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/PrimaryNodeOnly.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/PrimaryNodeOnly.java
 
b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/PrimaryNodeOnly.java
new file mode 100644
index 0000000..19d43c2
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/PrimaryNodeOnly.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.annotation.behavior;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marker annotation a {@link org.apache.nifi.processor.Processor} 
implementation
+ * can use to indicate that the {@link 
org.apache.nifi.scheduling.ExecutionNode}
+ * of the processor is to be set to PRIMARY
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface PrimaryNodeOnly {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-docs/src/main/asciidoc/developer-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc 
b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index bde08be..97ac234 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -1751,6 +1751,12 @@ will handle your Processor:
                will always be set to `1`. This does *not*, however, mean that 
the Processor does not have to be thread-safe,
                as the thread that is executing `onTrigger` may change between 
invocations.
 
+    - `PrimaryNodeOnly`: Apache NiFi, when clustered, offers two modes of 
execution for Processors: "Primary Node" and
+        "All Nodes". Although running in all the nodes offers better 
parallelism, some Processors are known to cause unintended
+        behaviors when run in multiple nodes. For instance, some Processors 
list or read files from remote filesystems. If such
+        Processors are scheduled to run on "All Nodes", it will cause 
unnecessary duplication and even errors. Such Processors
+        should use this annotation. Applying this annotation will restrict the 
Processor to run only on the "Primary Node".
+
        - `TriggerWhenAnyDestinationAvailable`: By default, NiFi will not 
schedule a Processor to run if any of its outbound
                queues is full. This allows back-pressure to be applied all the 
way a chain of Processors. However, some Processors
                may need to run even if one of the outbound queues is full. 
This annotations indicates that the Processor should run

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 0fd5ab7..3e037f8 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
@@ -58,6 +59,7 @@ import com.amazonaws.services.s3.model.VersionListing;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
 
+@PrimaryNodeOnly
 @TriggerSerially
 @TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index d9df136..9f178fa 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -28,6 +28,7 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -57,6 +58,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@PrimaryNodeOnly
 @TriggerSerially
 @Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
 @SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class, 
DeleteAzureBlobStorage.class })

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
index 476c039..780e8ac 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
@@ -107,7 +107,7 @@ public class ProcessorConfigDTO {
      * @return the amount of time that is used when this processor penalizes a 
flowfile
      */
     @ApiModelProperty(
-            value = "The amout of time that is used when the process penalizes 
a flowfile."
+            value = "The amount of time that is used when the process 
penalizes a flowfile."
     )
     public String getPenaltyDuration() {
         return penaltyDuration;

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
index 7fcdbbc..cd1fe41 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
@@ -46,6 +46,7 @@ public class ProcessorDTO extends ComponentDTO {
     private Boolean restricted;
     private Boolean deprecated;
     private Boolean isExtensionMissing;
+    private Boolean executionNodeRestricted;
     private Boolean multipleVersionsAvailable;
     private String inputRequirement;
 
@@ -335,4 +336,17 @@ public class ProcessorDTO extends ComponentDTO {
         this.description = description;
     }
 
+    /**
+     * @return whether or not this processor is restricted to run only in 
primary node
+     */
+    @ApiModelProperty(
+            value = "Indicates if the execution node of a processor is 
restricted to run only on the primary node"
+    )
+    public Boolean isExecutionNodeRestricted() {
+        return executionNodeRestricted;
+    }
+
+    public void setExecutionNodeRestricted(Boolean executionNodeRestricted) {
+        this.executionNodeRestricted = executionNodeRestricted;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 12c5ffc..cfe979a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -68,6 +68,8 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
 
     public abstract boolean isEventDrivenSupported();
 
+    public abstract boolean isExecutionNodeRestricted();
+
     public abstract Requirement getInputRequirement();
 
     public abstract List<ActiveThreadInfo> getActiveThreads();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
index 2a3e062..183dbaf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller;
 
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -39,6 +40,7 @@ public class ProcessorDetails {
     private final boolean triggerWhenAnyDestinationAvailable;
     private final boolean eventDrivenSupported;
     private final boolean batchSupported;
+    private final boolean executionNodeRestricted;
     private final InputRequirement.Requirement inputRequirement;
     private final TerminationAwareLogger componentLog;
     private final BundleCoordinate bundleCoordinate;
@@ -55,6 +57,7 @@ public class ProcessorDetails {
         this.triggeredSerially = 
procClass.isAnnotationPresent(TriggerSerially.class);
         this.triggerWhenAnyDestinationAvailable = 
procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
         this.eventDrivenSupported = 
procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && 
!triggerWhenEmpty;
+        this.executionNodeRestricted = 
procClass.isAnnotationPresent(PrimaryNodeOnly.class);
 
         final boolean inputRequirementPresent = 
procClass.isAnnotationPresent(InputRequirement.class);
         if (inputRequirementPresent) {
@@ -96,6 +99,10 @@ public class ProcessorDetails {
         return batchSupported;
     }
 
+    public boolean isExecutionNodeRestricted(){
+        return executionNodeRestricted;
+    }
+
     public InputRequirement.Requirement getInputRequirement() {
         return inputRequirement;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index c8e0570..5d6b3da 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -193,7 +193,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         onScheduleTimeoutMillis = timeoutString == null ? 60000 : 
FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
 
         schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
-        executionNode = ExecutionNode.ALL;
+        executionNode = isExecutionNodeRestricted() ? ExecutionNode.PRIMARY : 
ExecutionNode.ALL;
         this.hashCode = new HashCodeBuilder(7, 
67).append(identifier).toHashCode();
 
         try {
@@ -365,6 +365,14 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
+     *  Indicates whether the processor's executionNode configuration is 
restricted to run only in primary node
+     */
+    @Override
+    public boolean isExecutionNodeRestricted(){
+        return processorRef.get().isExecutionNodeRestricted();
+    }
+
+    /**
      * Indicates whether flow file content made by this processor must be
      * persisted
      *
@@ -530,7 +538,11 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public synchronized void setExecutionNode(final ExecutionNode 
executionNode) {
-        this.executionNode = executionNode;
+        if (this.isExecutionNodeRestricted()) {
+            this.executionNode = ExecutionNode.PRIMARY;
+        } else {
+            this.executionNode = executionNode;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyPrimaryNodeOnlyProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyPrimaryNodeOnlyProcessor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyPrimaryNodeOnlyProcessor.java
new file mode 100644
index 0000000..572d655
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyPrimaryNodeOnlyProcessor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+        import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+        import org.apache.nifi.processor.AbstractProcessor;
+        import org.apache.nifi.processor.ProcessContext;
+        import org.apache.nifi.processor.ProcessSession;
+        import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * Dummy processor implementation to test {@link PrimaryNodeOnly} marker 
annotation
+ */
+@PrimaryNodeOnly
+public class DummyPrimaryNodeOnlyProcessor extends AbstractProcessor{
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index fd3f83b..d34a6de 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -52,6 +52,7 @@ import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.api.dto.BundleDTO;
@@ -524,6 +525,13 @@ public class TestFlowController {
     }
 
     @Test
+    public void testPrimaryNodeOnlyAnnotation() throws 
ProcessorInstantiationException {
+        String id = UUID.randomUUID().toString();
+        ProcessorNode processorNode = 
controller.createProcessor(DummyPrimaryNodeOnlyProcessor.class.getName(), id, 
systemBundle.getBundleDetails().getCoordinate());
+        assertEquals(ExecutionNode.PRIMARY, processorNode.getExecutionNode());
+    }
+
+    @Test
     public void testDeleteProcessGroup() {
         ProcessGroup pg = controller.createProcessGroup("my-process-group");
         pg.setName("my-process-group");
@@ -760,6 +768,7 @@ public class TestFlowController {
         
processorDTO.setInputRequirement(processorNode.getInputRequirement().name());
         
processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class));
         processorDTO.setRestricted(processorNode.isRestricted());
+        
processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted());
         processorDTO.setExtensionMissing(processorNode.isExtensionMissing());
 
         processorDTO.setType(processorNode.getCanonicalClassName());
@@ -813,6 +822,7 @@ public class TestFlowController {
         
processorDTO.setInputRequirement(processorNode.getInputRequirement().name());
         
processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class));
         processorDTO.setRestricted(processorNode.isRestricted());
+        
processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted());
         processorDTO.setExtensionMissing(processorNode.isExtensionMissing());
 
         processorDTO.setType(processorNode.getCanonicalClassName());
@@ -868,6 +878,7 @@ public class TestFlowController {
         
processorDTO.setInputRequirement(processorNode.getInputRequirement().name());
         
processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class));
         processorDTO.setRestricted(processorNode.isRestricted());
+        
processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted());
         processorDTO.setExtensionMissing(processorNode.isExtensionMissing());
 
         processorDTO.setType(processorNode.getCanonicalClassName());

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index e3be8d7..4016d66 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -2786,6 +2786,7 @@ public final class DtoFactory {
         
dto.setPersistsState(node.getProcessor().getClass().isAnnotationPresent(Stateful.class));
         dto.setRestricted(node.isRestricted());
         dto.setDeprecated(node.isDeprecated());
+        dto.setExecutionNodeRestricted(node.isExecutionNodeRestricted());
         dto.setExtensionMissing(node.isExtensionMissing());
         dto.setMultipleVersionsAvailable(compatibleBundles.size() > 1);
         
dto.setVersionedComponentId(node.getVersionedComponentId().orElse(null));
@@ -3767,6 +3768,7 @@ public final class DtoFactory {
         copy.setSupportsEventDriven(original.getSupportsEventDriven());
         copy.setSupportsBatching(original.getSupportsBatching());
         copy.setPersistsState(original.getPersistsState());
+        copy.setExecutionNodeRestricted(original.isExecutionNodeRestricted());
         copy.setExtensionMissing(original.getExtensionMissing());
         
copy.setMultipleVersionsAvailable(original.getMultipleVersionsAvailable());
         copy.setValidationErrors(copy(original.getValidationErrors()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp
index a0ce8c4..b7917a1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp
@@ -170,7 +170,7 @@
                         <div class="execution-node-setting">
                             <div class="setting-name">
                                 Execution
-                                <div class="fa fa-question-circle" alt="Info" 
title="The node(s) that this processor will be scheduled to run on."></div>
+                                <div class="fa fa-question-circle" alt="Info" 
title="The node(s) that this processor will be scheduled to run on when 
clustered."></div>
                             </div>
                             <div class="setting-field">
                                 <div id="execution-node-combo"></div>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js
index 90667f0..f6c69a6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js
@@ -128,12 +128,12 @@
         return [{
             text: 'All nodes',
             value: 'ALL',
-            description: 'Processor will be scheduled to run on all nodes'
+            description: 'Processor will be scheduled to run on all nodes',
+            disabled: processor.executionNodeRestricted === true
         }, {
             text: 'Primary node',
             value: 'PRIMARY',
-            description: 'Processor will be scheduled to run only on the 
primary node',
-            disabled: !nfClusterSummary.isClustered() && 
processor.config['executionNode'] === 'PRIMARY'
+            description: 'Processor will be scheduled to run only on the 
primary node'
         }];
     };
 
@@ -741,12 +741,7 @@
                         }
                     });
 
-                    // show the execution node option if we're cluster or 
we're currently configured to run on the primary node only
-                    if (nfClusterSummary.isClustered() || executionNode === 
'PRIMARY') {
-                        $('#execution-node-options').show();
-                    } else {
-                        $('#execution-node-options').hide();
-                    }
+                    $('#execution-node-options').show();
 
                     // initialize the concurrentTasks
                     var defaultConcurrentTasks = 
processor.config['defaultConcurrentTasks'];

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js
index 69969e3..2a956b6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js
@@ -216,20 +216,16 @@
 
                     var executionNode = details.config['executionNode'];
 
-                    // only show the execution-node when applicable
-                    if (nfClusterSummary.isClustered() || executionNode === 
'PRIMARY') {
-                        if (executionNode === 'ALL') {
-                            executionNode = "All nodes";
-                        } else if (executionNode === 'PRIMARY') {
-                            executionNode = "Primary node only";
-                        }
-                        nfCommon.populateField('read-only-execution-node', 
executionNode);
-
-                        $('#read-only-execution-node-options').show();
-                    } else {
-                        $('#read-only-execution-node-options').hide();
+                    if (executionNode === 'ALL') {
+                        executionNode = "All nodes";
+                    } else if (executionNode === 'PRIMARY') {
+                        executionNode = "Primary node only";
                     }
 
+                    nfCommon.populateField('read-only-execution-node', 
executionNode);
+
+                    $('#read-only-execution-node-options').show();
+
                     // load the relationship list
                     if (!nfCommon.isEmpty(details.relationships)) {
                         $.each(details.relationships, function (i, 
relationship) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 46b48fb..e018814 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -24,6 +24,7 @@ import com.google.cloud.storage.Storage;
 import com.google.common.collect.ImmutableList;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.Stateful;
@@ -103,6 +104,7 @@ import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
 /**
  * List objects in a google cloud storage bucket by object name pattern.
  */
+@PrimaryNodeOnly
 @TriggerSerially
 @TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index bd30ca1..612247d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
@@ -62,6 +63,7 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+@PrimaryNodeOnly
 @TriggerSerially
 @TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
index 5f2cd8a..a93f2de 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
@@ -43,6 +43,7 @@ import javax.jms.Session;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -61,6 +62,7 @@ import org.apache.nifi.processors.standard.util.JmsFactory;
 import org.apache.nifi.processors.standard.util.JmsProperties;
 import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
 
+@PrimaryNodeOnly
 @Deprecated
 @DeprecationNotice(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS"}, 
reason = "This processor is deprecated and may be removed in future releases.")
 @TriggerSerially

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
index f9116c5..ec2d3c1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -59,6 +60,7 @@ import java.util.stream.Stream;
 /**
  * A processor to retrieve a list of tables (and their metadata) from a 
database connection
  */
+@PrimaryNodeOnly
 @TriggerSerially
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"sql", "list", "jdbc", "table", "database"})

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index 79a4177..6b6a9a7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -37,6 +38,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
 
+@PrimaryNodeOnly
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"list", "ftp", "remote", "ingest", "source", "input", "files"})

http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index ac1a42d..7af8766 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -39,6 +40,7 @@ import org.apache.nifi.processors.standard.util.FTPTransfer;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 
+@PrimaryNodeOnly
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"list", "sftp", "remote", "ingest", "source", "input", "files"})

Reply via email to