Repository: nifi Updated Branches: refs/heads/master dbef69040 -> 0973c2d8d
NIFI-543 Added annotation to restrict processor to run only on the primary node - PR Fix - 'Execution' dropdown will now be shown in all cases - Annotated ListGCSBucket with PrimaryNodeOnly This closes #2509. Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0973c2d8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0973c2d8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0973c2d8 Branch: refs/heads/master Commit: 0973c2d8d13940323ebb30eaa383608bf9e7cbb5 Parents: dbef690 Author: zenfenan <[email protected]> Authored: Sat Mar 3 21:14:51 2018 +0530 Committer: Mark Payne <[email protected]> Committed: Fri May 25 11:50:29 2018 -0400 ---------------------------------------------------------------------- .../annotation/behavior/PrimaryNodeOnly.java | 36 ++++++++++++++++++++ .../src/main/asciidoc/developer-guide.adoc | 6 ++++ .../apache/nifi/processors/aws/s3/ListS3.java | 2 ++ .../azure/storage/ListAzureBlobStorage.java | 2 ++ .../nifi/web/api/dto/ProcessorConfigDTO.java | 2 +- .../apache/nifi/web/api/dto/ProcessorDTO.java | 14 ++++++++ .../apache/nifi/controller/ProcessorNode.java | 2 ++ .../nifi/controller/ProcessorDetails.java | 7 ++++ .../nifi/controller/StandardProcessorNode.java | 16 +++++++-- .../DummyPrimaryNodeOnlyProcessor.java | 35 +++++++++++++++++++ .../nifi/controller/TestFlowController.java | 11 ++++++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 2 ++ .../partials/canvas/processor-configuration.jsp | 2 +- .../js/nf/canvas/nf-processor-configuration.js | 13 +++---- .../main/webapp/js/nf/nf-processor-details.js | 20 +++++------ .../processors/gcp/storage/ListGCSBucket.java | 2 ++ .../apache/nifi/processors/hadoop/ListHDFS.java | 2 ++ .../nifi/processors/standard/GetJMSTopic.java | 2 ++ .../processors/standard/ListDatabaseTables.java | 2 ++ .../nifi/processors/standard/ListFTP.java | 2 ++ .../nifi/processors/standard/ListSFTP.java | 2 ++ 21 files changed, 157 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/PrimaryNodeOnly.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/PrimaryNodeOnly.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/PrimaryNodeOnly.java new file mode 100644 index 0000000..19d43c2 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/PrimaryNodeOnly.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor} implementation + * can use to indicate that the {@link org.apache.nifi.scheduling.ExecutionNode} + * of the processor is to be set to PRIMARY + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface PrimaryNodeOnly { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-docs/src/main/asciidoc/developer-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc index bde08be..97ac234 100644 --- a/nifi-docs/src/main/asciidoc/developer-guide.adoc +++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc @@ -1751,6 +1751,12 @@ will handle your Processor: will always be set to `1`. This does *not*, however, mean that the Processor does not have to be thread-safe, as the thread that is executing `onTrigger` may change between invocations. + - `PrimaryNodeOnly`: Apache NiFi, when clustered, offers two modes of execution for Processors: "Primary Node" and + "All Nodes". Although running in all the nodes offers better parallelism, some Processors are known to cause unintended + behaviors when run in multiple nodes. For instance, some Processors list or read files from remote filesystems. If such + Processors are scheduled to run on "All Nodes", it will cause unnecessary duplication and even errors. Such Processors + should use this annotation. Applying this annotation will restrict the Processor to run only on the "Primary Node". + - `TriggerWhenAnyDestinationAvailable`: By default, NiFi will not schedule a Processor to run if any of its outbound queues is full. This allows back-pressure to be applied all the way a chain of Processors. However, some Processors may need to run even if one of the outbound queues is full. This annotations indicates that the Processor should run http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index 0fd5ab7..3e037f8 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; @@ -58,6 +59,7 @@ import com.amazonaws.services.s3.model.VersionListing; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; +@PrimaryNodeOnly @TriggerSerially @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_FORBIDDEN) http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index d9df136..9f178fa 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -28,6 +28,7 @@ import com.microsoft.azure.storage.blob.ListBlobItem; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -57,6 +58,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +@PrimaryNodeOnly @TriggerSerially @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) @SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class, DeleteAzureBlobStorage.class }) http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java index 476c039..780e8ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java @@ -107,7 +107,7 @@ public class ProcessorConfigDTO { * @return the amount of time that is used when this processor penalizes a flowfile */ @ApiModelProperty( - value = "The amout of time that is used when the process penalizes a flowfile." + value = "The amount of time that is used when the process penalizes a flowfile." ) public String getPenaltyDuration() { return penaltyDuration; http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java index 7fcdbbc..cd1fe41 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java @@ -46,6 +46,7 @@ public class ProcessorDTO extends ComponentDTO { private Boolean restricted; private Boolean deprecated; private Boolean isExtensionMissing; + private Boolean executionNodeRestricted; private Boolean multipleVersionsAvailable; private String inputRequirement; @@ -335,4 +336,17 @@ public class ProcessorDTO extends ComponentDTO { this.description = description; } + /** + * @return whether or not this processor is restricted to run only in primary node + */ + @ApiModelProperty( + value = "Indicates if the execution node of a processor is restricted to run only on the primary node" + ) + public Boolean isExecutionNodeRestricted() { + return executionNodeRestricted; + } + + public void setExecutionNodeRestricted(Boolean executionNodeRestricted) { + this.executionNodeRestricted = executionNodeRestricted; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 12c5ffc..cfe979a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -68,6 +68,8 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con public abstract boolean isEventDrivenSupported(); + public abstract boolean isExecutionNodeRestricted(); + public abstract Requirement getInputRequirement(); public abstract List<ActiveThreadInfo> getActiveThreads(); http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java index 2a3e062..183dbaf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -39,6 +40,7 @@ public class ProcessorDetails { private final boolean triggerWhenAnyDestinationAvailable; private final boolean eventDrivenSupported; private final boolean batchSupported; + private final boolean executionNodeRestricted; private final InputRequirement.Requirement inputRequirement; private final TerminationAwareLogger componentLog; private final BundleCoordinate bundleCoordinate; @@ -55,6 +57,7 @@ public class ProcessorDetails { this.triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class); this.triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class); this.eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty; + this.executionNodeRestricted = procClass.isAnnotationPresent(PrimaryNodeOnly.class); final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class); if (inputRequirementPresent) { @@ -96,6 +99,10 @@ public class ProcessorDetails { return batchSupported; } + public boolean isExecutionNodeRestricted(){ + return executionNodeRestricted; + } + public InputRequirement.Requirement getInputRequirement() { return inputRequirement; } http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index c8e0570..5d6b3da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -193,7 +193,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable onScheduleTimeoutMillis = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; - executionNode = ExecutionNode.ALL; + executionNode = isExecutionNodeRestricted() ? ExecutionNode.PRIMARY : ExecutionNode.ALL; this.hashCode = new HashCodeBuilder(7, 67).append(identifier).toHashCode(); try { @@ -365,6 +365,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** + * Indicates whether the processor's executionNode configuration is restricted to run only in primary node + */ + @Override + public boolean isExecutionNodeRestricted(){ + return processorRef.get().isExecutionNodeRestricted(); + } + + /** * Indicates whether flow file content made by this processor must be * persisted * @@ -530,7 +538,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public synchronized void setExecutionNode(final ExecutionNode executionNode) { - this.executionNode = executionNode; + if (this.isExecutionNodeRestricted()) { + this.executionNode = ExecutionNode.PRIMARY; + } else { + this.executionNode = executionNode; + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyPrimaryNodeOnlyProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyPrimaryNodeOnlyProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyPrimaryNodeOnlyProcessor.java new file mode 100644 index 0000000..572d655 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyPrimaryNodeOnlyProcessor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + + import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.exception.ProcessException; + +/** + * Dummy processor implementation to test {@link PrimaryNodeOnly} marker annotation + */ +@PrimaryNodeOnly +public class DummyPrimaryNodeOnlyProcessor extends AbstractProcessor{ + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index fd3f83b..d34a6de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -52,6 +52,7 @@ import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.BundleDTO; @@ -524,6 +525,13 @@ public class TestFlowController { } @Test + public void testPrimaryNodeOnlyAnnotation() throws ProcessorInstantiationException { + String id = UUID.randomUUID().toString(); + ProcessorNode processorNode = controller.createProcessor(DummyPrimaryNodeOnlyProcessor.class.getName(), id, systemBundle.getBundleDetails().getCoordinate()); + assertEquals(ExecutionNode.PRIMARY, processorNode.getExecutionNode()); + } + + @Test public void testDeleteProcessGroup() { ProcessGroup pg = controller.createProcessGroup("my-process-group"); pg.setName("my-process-group"); @@ -760,6 +768,7 @@ public class TestFlowController { processorDTO.setInputRequirement(processorNode.getInputRequirement().name()); processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class)); processorDTO.setRestricted(processorNode.isRestricted()); + processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted()); processorDTO.setExtensionMissing(processorNode.isExtensionMissing()); processorDTO.setType(processorNode.getCanonicalClassName()); @@ -813,6 +822,7 @@ public class TestFlowController { processorDTO.setInputRequirement(processorNode.getInputRequirement().name()); processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class)); processorDTO.setRestricted(processorNode.isRestricted()); + processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted()); processorDTO.setExtensionMissing(processorNode.isExtensionMissing()); processorDTO.setType(processorNode.getCanonicalClassName()); @@ -868,6 +878,7 @@ public class TestFlowController { processorDTO.setInputRequirement(processorNode.getInputRequirement().name()); processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class)); processorDTO.setRestricted(processorNode.isRestricted()); + processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted()); processorDTO.setExtensionMissing(processorNode.isExtensionMissing()); processorDTO.setType(processorNode.getCanonicalClassName()); http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index e3be8d7..4016d66 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -2786,6 +2786,7 @@ public final class DtoFactory { dto.setPersistsState(node.getProcessor().getClass().isAnnotationPresent(Stateful.class)); dto.setRestricted(node.isRestricted()); dto.setDeprecated(node.isDeprecated()); + dto.setExecutionNodeRestricted(node.isExecutionNodeRestricted()); dto.setExtensionMissing(node.isExtensionMissing()); dto.setMultipleVersionsAvailable(compatibleBundles.size() > 1); dto.setVersionedComponentId(node.getVersionedComponentId().orElse(null)); @@ -3767,6 +3768,7 @@ public final class DtoFactory { copy.setSupportsEventDriven(original.getSupportsEventDriven()); copy.setSupportsBatching(original.getSupportsBatching()); copy.setPersistsState(original.getPersistsState()); + copy.setExecutionNodeRestricted(original.isExecutionNodeRestricted()); copy.setExtensionMissing(original.getExtensionMissing()); copy.setMultipleVersionsAvailable(original.getMultipleVersionsAvailable()); copy.setValidationErrors(copy(original.getValidationErrors())); http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp index a0ce8c4..b7917a1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp @@ -170,7 +170,7 @@ <div class="execution-node-setting"> <div class="setting-name"> Execution - <div class="fa fa-question-circle" alt="Info" title="The node(s) that this processor will be scheduled to run on."></div> + <div class="fa fa-question-circle" alt="Info" title="The node(s) that this processor will be scheduled to run on when clustered."></div> </div> <div class="setting-field"> <div id="execution-node-combo"></div> http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js index 90667f0..f6c69a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js @@ -128,12 +128,12 @@ return [{ text: 'All nodes', value: 'ALL', - description: 'Processor will be scheduled to run on all nodes' + description: 'Processor will be scheduled to run on all nodes', + disabled: processor.executionNodeRestricted === true }, { text: 'Primary node', value: 'PRIMARY', - description: 'Processor will be scheduled to run only on the primary node', - disabled: !nfClusterSummary.isClustered() && processor.config['executionNode'] === 'PRIMARY' + description: 'Processor will be scheduled to run only on the primary node' }]; }; @@ -741,12 +741,7 @@ } }); - // show the execution node option if we're cluster or we're currently configured to run on the primary node only - if (nfClusterSummary.isClustered() || executionNode === 'PRIMARY') { - $('#execution-node-options').show(); - } else { - $('#execution-node-options').hide(); - } + $('#execution-node-options').show(); // initialize the concurrentTasks var defaultConcurrentTasks = processor.config['defaultConcurrentTasks']; http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js index 69969e3..2a956b6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js @@ -216,20 +216,16 @@ var executionNode = details.config['executionNode']; - // only show the execution-node when applicable - if (nfClusterSummary.isClustered() || executionNode === 'PRIMARY') { - if (executionNode === 'ALL') { - executionNode = "All nodes"; - } else if (executionNode === 'PRIMARY') { - executionNode = "Primary node only"; - } - nfCommon.populateField('read-only-execution-node', executionNode); - - $('#read-only-execution-node-options').show(); - } else { - $('#read-only-execution-node-options').hide(); + if (executionNode === 'ALL') { + executionNode = "All nodes"; + } else if (executionNode === 'PRIMARY') { + executionNode = "Primary node only"; } + nfCommon.populateField('read-only-execution-node', executionNode); + + $('#read-only-execution-node-options').show(); + // load the relationship list if (!nfCommon.isEmpty(details.relationships)) { $.each(details.relationships, function (i, relationship) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java index 46b48fb..e018814 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java @@ -24,6 +24,7 @@ import com.google.cloud.storage.Storage; import com.google.common.collect.ImmutableList; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.Stateful; @@ -103,6 +104,7 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC; /** * List objects in a google cloud storage bucket by object name pattern. */ +@PrimaryNodeOnly @TriggerSerially @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_FORBIDDEN) http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index bd30ca1..612247d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; @@ -62,6 +63,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +@PrimaryNodeOnly @TriggerSerially @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_FORBIDDEN) http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java index 5f2cd8a..a93f2de 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java @@ -43,6 +43,7 @@ import javax.jms.Session; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -61,6 +62,7 @@ import org.apache.nifi.processors.standard.util.JmsFactory; import org.apache.nifi.processors.standard.util.JmsProperties; import org.apache.nifi.processors.standard.util.WrappedMessageConsumer; +@PrimaryNodeOnly @Deprecated @DeprecationNotice(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS"}, reason = "This processor is deprecated and may be removed in future releases.") @TriggerSerially http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java index f9116c5..ec2d3c1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -59,6 +60,7 @@ import java.util.stream.Stream; /** * A processor to retrieve a list of tables (and their metadata) from a database connection */ +@PrimaryNodeOnly @TriggerSerially @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"sql", "list", "jdbc", "table", "database"}) http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java index 79a4177..6b6a9a7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -37,6 +38,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer; +@PrimaryNodeOnly @TriggerSerially @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"list", "ftp", "remote", "ingest", "source", "input", "files"}) http://git-wip-us.apache.org/repos/asf/nifi/blob/0973c2d8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index ac1a42d..7af8766 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -39,6 +40,7 @@ import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; +@PrimaryNodeOnly @TriggerSerially @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"list", "sftp", "remote", "ingest", "source", "input", "files"})
