Repository: nifi Updated Branches: refs/heads/master 4b4e099f2 -> bff89f17b
NIFI-401 Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7eca2037 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7eca2037 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7eca2037 Branch: refs/heads/master Commit: 7eca2037bd602ecd2b26281d91ac3af9bc72a3f5 Parents: 4b4e099 Author: Brian Eugley <[email protected]> Authored: Thu Jun 9 11:32:15 2016 -0400 Committer: jpercivall <[email protected]> Committed: Tue Nov 8 17:06:36 2016 -0500 ---------------------------------------------------------------------- .../apache/nifi/scheduling/ExecutionNode.java | 22 +++++++ .../nifi/scheduling/SchedulingStrategy.java | 4 ++ .../nifi/web/api/dto/ProcessorConfigDTO.java | 17 ++++++ .../apache/nifi/controller/ProcessorNode.java | 5 ++ .../apache/nifi/controller/FlowController.java | 7 +++ .../controller/StandardFlowSynchronizer.java | 5 ++ .../nifi/controller/StandardProcessorNode.java | 15 +++++ .../serialization/FlowFromDOMFactory.java | 9 +++ .../serialization/StandardFlowSerializer.java | 1 + .../nifi/fingerprint/FingerprintFactory.java | 1 + .../src/main/resources/FlowConfiguration.xsd | 11 +++- .../org/apache/nifi/audit/ProcessorAuditor.java | 4 ++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 2 + .../nifi/web/controller/ControllerFacade.java | 7 +++ .../nifi/web/dao/impl/StandardProcessorDAO.java | 13 +++++ .../partials/canvas/processor-configuration.jsp | 14 ++++- .../WEB-INF/partials/processor-details.jsp | 14 ++++- .../main/webapp/css/processor-configuration.css | 8 ++- .../src/main/webapp/css/processor-details.css | 4 +- .../js/nf/canvas/nf-processor-configuration.js | 60 +++++++++++++------- .../main/webapp/js/nf/nf-processor-details.js | 25 +++++++- 21 files changed, 220 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java b/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java new file mode 100644 index 0000000..f479732 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scheduling; + +public enum ExecutionNode { + ALL, + PRIMARY; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java b/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java index ccf4281..21bf130 100644 --- a/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java +++ b/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java @@ -56,6 +56,10 @@ public enum SchedulingStrategy { */ TIMER_DRIVEN(1, "0 sec"), /** + * NOTE: This option has been deprecated with the addition of the + * execution-node combo box. It still exists for backward compatibility + * with existing flows that still have this value for schedulingStrategy. + ** * Indicates that the component will be scheduled via timer only on the * Primary Node. If the instance is not part of a cluster and this * Scheduling Strategy is used, the component will be scheduled in the same http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java index a9006c9..1e4e92f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java @@ -34,6 +34,7 @@ public class ProcessorConfigDTO { // settings private String schedulingPeriod; private String schedulingStrategy; + private String executionNode; private String penaltyDuration; private String yieldDuration; private String bulletinLevel; @@ -87,6 +88,22 @@ public class ProcessorConfigDTO { } /** + * Indicates which node the process should run on + * + * @return execution node + */ + @ApiModelProperty( + value = "Indicates the node where the process will execute." + ) + public String getExecutionNode() { + return executionNode; + } + + public void setExecutionNode(String executionNode) { + this.executionNode = executionNode; + } + + /** * @return the amount of time that is used when this processor penalizes a flowfile */ @ApiModelProperty( http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 08b4abe..8f405e0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -35,6 +35,7 @@ import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +88,10 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen @Override public abstract SchedulingStrategy getSchedulingStrategy(); + public abstract void setExecutionNode(ExecutionNode executionNode); + + public abstract ExecutionNode getExecutionNode(); + public abstract void setRunDuration(long duration, TimeUnit timeUnit); public abstract long getRunDuration(TimeUnit timeUnit); http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ba5ed36..5f1d34c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -207,6 +207,7 @@ import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.ComponentIdGenerator; @@ -481,6 +482,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); + // PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been + // re-configured with executeNode = PRIMARY). processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); @@ -1709,6 +1712,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); } + if (config.getExecutionNode() != null) { + procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode())); + } + // ensure that the scheduling strategy is set prior to these values procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); procNode.setScheduldingPeriod(config.getSchedulingPeriod()); http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 8cfb3f3..1d7ebde 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -88,6 +88,7 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.DomUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; @@ -902,6 +903,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); } + if (config.getExecutionNode() != null) { + procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode())); + } + // must set scheduling strategy before these two procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); procNode.setScheduldingPeriod(config.getSchedulingPeriod()); http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 5ff9f22..a765577 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -56,6 +56,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; @@ -134,6 +135,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private SchedulingStrategy schedulingStrategy; // guarded by read/write lock // ??????? NOT any more + private ExecutionNode executionNode; public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, @@ -190,6 +192,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; + executionNode = ExecutionNode.ALL; } /** @@ -426,6 +429,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } this.schedulingStrategy = schedulingStrategy; + // PRIMARY_NODE_ONLY is deprecated. Isolated is also set when executionNode == PRIMARY setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); } @@ -477,6 +481,17 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override + public void setExecutionNode(final ExecutionNode executionNode) { + this.executionNode = executionNode; + setIsolated(executionNode == ExecutionNode.PRIMARY); + } + + @Override + public ExecutionNode getExecutionNode() { + return this.executionNode; + } + + @Override public long getRunDuration(final TimeUnit timeUnit) { return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); } http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 2c51e96..f1e4232 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -32,6 +32,7 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.DomUtils; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -374,6 +375,14 @@ public class FlowFromDOMFactory { configDto.setSchedulingStrategy(schedulingStrategyName.trim()); } + // handle execution node + final String executionNode = getString(element, "executionNode"); + if (executionNode == null || executionNode.trim().isEmpty()) { + configDto.setExecutionNode(ExecutionNode.ALL.name()); + } else { + configDto.setExecutionNode(executionNode.trim()); + } + final Long runDurationNanos = getOptionalLong(element, "runDurationNanos"); if (runDurationNanos != null) { configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos)); http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index d04433c..0ead668 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -344,6 +344,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant())); addTextElement(element, "scheduledState", processor.getScheduledState().name()); addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); + addTextElement(element, "executionNode", processor.getExecutionNode().name()); addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor); http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index f3a4cbb..0ab39a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -519,6 +519,7 @@ public class FingerprintFactory { builder.append(config.getComments()); builder.append(config.getSchedulingPeriod()); builder.append(config.getSchedulingStrategy()); + builder.append(config.getExecutionNode()); builder.append(config.getYieldDuration()); builder.append(config.getConcurrentlySchedulableTaskCount()); builder.append(config.getPenaltyDuration()); http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index c1e66e5..adc83be 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -87,6 +87,8 @@ <xs:element name="schedulingStrategy" type="SchedulingStrategy" minOccurs="0" maxOccurs="1" /> + <xs:element name="executionNode" type="ExecutionNode" minOccurs="0" maxOccurs="1" /> + <xs:element name="runDurationNanos" type="xs:long" minOccurs="0" maxOccurs="1" /> <!-- properties that must be valid for the processor to execute. @@ -337,6 +339,13 @@ </xs:restriction> </xs:simpleType> + <xs:simpleType name="ExecutionNode"> + <xs:restriction base="xs:string"> + <xs:enumeration value="ALL"></xs:enumeration> + <xs:enumeration value="PRIMARY"></xs:enumeration> + </xs:restriction> + </xs:simpleType> + <xs:complexType name="ControllerServicesType"> <xs:sequence> <xs:element name="controllerService" type="ControllerServiceType" minOccurs="0" maxOccurs="unbounded" /> @@ -376,4 +385,4 @@ <xs:element name="annotationData" type="xs:string" minOccurs="0" maxOccurs="1" /> </xs:sequence> </xs:complexType> -</xs:schema> \ No newline at end of file +</xs:schema> http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java index 4bae6a6..d24ab85 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java @@ -68,6 +68,7 @@ public class ProcessorAuditor extends NiFiAuditor { private static final String AUTO_TERMINATED_RELATIONSHIPS = "Auto Terminated Relationships"; private static final String SCHEDULING_PERIOD = "Run Schedule"; private static final String SCHEDULING_STRATEGY = "Scheduling Strategy"; + private static final String EXECUTION_NODE = "Execution Node"; /** * Audits the creation of processors via createProcessor(). @@ -369,6 +370,9 @@ public class ProcessorAuditor extends NiFiAuditor { if (newConfig.getSchedulingStrategy() != null) { values.put(SCHEDULING_STRATEGY, processor.getSchedulingStrategy().toString()); } + if (newConfig.getExecutionNode() != null) { + values.put(EXECUTION_NODE, processor.getExecutionNode().name()); + } } return values; http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 39d09e9..c39af2f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -2492,6 +2492,7 @@ public final class DtoFactory { dto.setComments(procNode.getComments()); dto.setBulletinLevel(procNode.getBulletinLevel().name()); dto.setSchedulingStrategy(procNode.getSchedulingStrategy().name()); + dto.setExecutionNode(procNode.getExecutionNode().name()); dto.setAnnotationData(procNode.getAnnotationData()); // set up the default values for concurrent tasks and scheduling period @@ -2670,6 +2671,7 @@ public final class DtoFactory { copy.setAutoTerminatedRelationships(copy(original.getAutoTerminatedRelationships())); copy.setComments(original.getComments()); copy.setSchedulingStrategy(original.getSchedulingStrategy()); + copy.setExecutionNode(original.getExecutionNode()); copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount()); copy.setCustomUiUrl(original.getCustomUiUrl()); copy.setDescriptors(copy(original.getDescriptors())); http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index e250231..339e3c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -82,6 +82,7 @@ import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.search.SearchContext; import org.apache.nifi.search.SearchResult; import org.apache.nifi.search.Searchable; @@ -1589,9 +1590,15 @@ public class ControllerFacade implements Authorizable { } else if (SchedulingStrategy.TIMER_DRIVEN.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("timer", searchStr)) { matches.add("Scheduling strategy: Timer driven"); } else if (SchedulingStrategy.PRIMARY_NODE_ONLY.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("primary", searchStr)) { + // PRIMARY_NODE_ONLY has been deprecated as a SchedulingStrategy and replaced by PRIMARY as an ExecutionNode. matches.add("Scheduling strategy: On primary node"); } + // consider execution node + if (ExecutionNode.PRIMARY.equals(procNode.getExecutionNode()) && StringUtils.containsIgnoreCase("primary", searchStr)) { + matches.add("Execution node: primary"); + } + // consider scheduled state if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { if (StringUtils.containsIgnoreCase("disabled", searchStr)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index f42bc7b..fcd1e53 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -31,6 +31,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Relationship; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.ResourceNotFoundException; @@ -117,6 +118,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { if (isNotNull(config)) { // perform the configuration final String schedulingStrategy = config.getSchedulingStrategy(); + final String executionNode = config.getExecutionNode(); final String comments = config.getComments(); final String annotationData = config.getAnnotationData(); final Integer maxTasks = config.getConcurrentlySchedulableTaskCount(); @@ -133,6 +135,9 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { processor.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy)); } + if (isNotNull(executionNode)) { + processor.setExecutionNode(ExecutionNode.valueOf(executionNode)); + } if (isNotNull(comments)) { processor.setComments(comments); } @@ -211,6 +216,13 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { validationErrors.add(String.format("Bulletin level: Value must be one of [%s]", StringUtils.join(LogLevel.values(), ", "))); } } + if (isNotNull(config.getExecutionNode())) { + try { + ExecutionNode.valueOf(config.getExecutionNode()); + } catch (IllegalArgumentException iae) { + validationErrors.add(String.format("Execution node: Value must be one of [%s]", StringUtils.join(ExecutionNode.values(), ", "))); + } + } // get the current scheduling strategy SchedulingStrategy schedulingStrategy = processorNode.getSchedulingStrategy(); @@ -345,6 +357,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { configDTO.getProperties(), configDTO.getSchedulingPeriod(), configDTO.getSchedulingStrategy(), + configDTO.getExecutionNode(), configDTO.getYieldDuration())) { modificationRequest = true; http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp index e59c233..438d591 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp @@ -108,6 +108,18 @@ </div> <div class="clear"></div> </div> + <div class="setting"> + <div id="execution-node-container" class="execution-node-setting"> + <div class="setting-name"> + Execution node + <img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The node(s) that will run this processor."/> + </div> + <div class="setting-field"> + <div id="execution-node-combo"></div> + </div> + </div> + <div class="clear"></div> + </div> <div id="timer-driven-options" class="setting"> <div class="concurrently-schedulable-tasks-setting"> <div class="setting-name"> @@ -208,4 +220,4 @@ </div> </div> </div> -<div id="new-processor-property-container"></div> \ No newline at end of file +<div id="new-processor-property-container"></div> http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp index 6f14edd..3210a40 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp @@ -103,6 +103,18 @@ <div class="clear"></div> </div> <div class="setting"> + <div id="details-execution-node-container" class="execution-node-setting"> + <div class="setting-name"> + Execution node + <img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The node(s) that will run this processor."/> + </div> + <div class="setting-field"> + <span id="read-only-execution-node"></span> + </div> + </div> + <div class="clear"></div> + </div> + <div class="setting"> <div class="concurrently-schedulable-tasks-setting"> <div class="setting-name"> Concurrent tasks @@ -151,4 +163,4 @@ </div> </div> </div> -</div> \ No newline at end of file +</div> http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css index 298f261..c8b5843 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css @@ -55,6 +55,12 @@ div.processor-enabled-container { width: 20%; } +#execution-node-combo { + width: 130px; + height: 18px; + line-height: 18px; +} + #event-driven-warning { padding-top: 22px; color: #f00; @@ -185,4 +191,4 @@ div.relationship-description { #processor-comments { height: 100%; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-details.css ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-details.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-details.css index 36fabda..ed4c0fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-details.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-details.css @@ -50,11 +50,11 @@ width: 340px; } -div.concurrently-schedulable-tasks-setting, div.scheduling-period-setting, div.penalty-duration-setting, div.yield-duration-setting, div.scheduling-strategy-setting, div.bulletin-setting { +div.concurrently-schedulable-tasks-setting, div.scheduling-period-setting, div.penalty-duration-setting, div.yield-duration-setting, div.scheduling-strategy-setting, div.execution-node-setting, div.bulletin-setting { float: left; width: 40%; } div.scheduling-period-setting, div.yield-duration-setting { margin-left: 20%; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js index 99676c6..eb0df92 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js @@ -52,22 +52,6 @@ nf.ProcessorConfiguration = (function () { }); } - // conditionally support event driven - if (nf.Canvas.isClustered()) { - strategies.push({ - text: 'On primary node', - value: 'PRIMARY_NODE_ONLY', - description: 'Processor will be scheduled on the primary node on an interval defined by the run schedule.' - }); - } else if (processor.config['schedulingStrategy'] === 'PRIMARY_NODE_ONLY') { - strategies.push({ - text: 'On primary node', - value: 'PRIMARY_NODE_ONLY', - description: 'Processor will be scheduled on the primary node on an interval defined by the run schedule.', - disabled: true - }); - } - // add an option for cron driven strategies.push({ text: 'CRON driven', @@ -200,6 +184,9 @@ nf.ProcessorConfiguration = (function () { return true; } + if ($('#execution-node-combo').combo('getSelectedOption').value !== (details.config['executionNode'] + '')) { + return true; + } if ($('#processor-name').val() !== details['name']) { return true; } @@ -263,6 +250,7 @@ nf.ProcessorConfiguration = (function () { processorConfigDto['schedulingPeriod'] = schedulingPeriod.val(); } + processorConfigDto['executionNode'] = $('#execution-node-combo').combo('getSelectedOption').value; processorConfigDto['penaltyDuration'] = $('#penalty-duration').val(); processorConfigDto['yieldDuration'] = $('#yield-duration').val(); processorConfigDto['bulletinLevel'] = $('#bulletin-level-combo').combo('getSelectedOption').value; @@ -528,6 +516,19 @@ nf.ProcessorConfiguration = (function () { }] }); + // initialize the execution node combo + $('#execution-node-combo').combo({ + options: [{ + text: 'All nodes', + value: 'ALL', + description: 'Processor will be configured to run on all nodes' + }, { + text: 'Primary node only', + value: 'PRIMARY', + description: 'Processor will be configured to run only on the primary node' + }] + }); + // initialize the run duration slider $('#run-duration-slider').slider({ min: 0, @@ -629,11 +630,22 @@ nf.ProcessorConfiguration = (function () { value: processor.config['bulletinLevel'] }); + // If the scheduling strategy is PRIMARY_NODE_ONLY (deprecated), + // then set the execution node to PRIMARY and the scheduling + // strategy to TIMER. These new values will be saved when/if + // the dialog is applied. + var schedulingStrategy = processor.config['schedulingStrategy']; + var executionNode = processor.config['executionNode']; + if (schedulingStrategy === 'PRIMARY_NODE_ONLY') { + executionNode = 'PRIMARY'; + schedulingStrategy = 'TIMER_DRIVEN'; + } + // initialize the scheduling strategy $('#scheduling-strategy-combo').combo({ options: getSchedulingStrategies(processor), selectedOption: { - value: processor.config['schedulingStrategy'] + value: schedulingStrategy }, select: function (selectedOption) { // show the appropriate panel @@ -659,6 +671,16 @@ nf.ProcessorConfiguration = (function () { } }); + // select the execution node + $('#execution-node-combo').combo('setSelectedOption', { + value: executionNode + }); + if (nf.Canvas.isClustered()) { + $('#execution-node-container').show(); + } else { + $('#execution-node-container').hide(); + } + // initialize the concurrentTasks var defaultConcurrentTasks = processor.config['defaultConcurrentTasks']; $('#timer-driven-concurrently-schedulable-tasks').val(defaultConcurrentTasks['TIMER_DRIVEN']); @@ -667,9 +689,9 @@ nf.ProcessorConfiguration = (function () { // get the appropriate concurrent tasks field var concurrentTasks; - if (processor.config['schedulingStrategy'] === 'EVENT_DRIVEN') { + if (schedulingStrategy === 'EVENT_DRIVEN') { concurrentTasks = $('#event-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']); - } else if (processor.config['schedulingStrategy'] === 'CRON_DRIVEN') { + } else if (schedulingStrategy === 'CRON_DRIVEN') { concurrentTasks = $('#cron-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']); } else { concurrentTasks = $('#timer-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']); http://git-wip-us.apache.org/repos/asf/nifi/blob/7eca2037/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js index 175857b..f94ece0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js @@ -104,6 +104,7 @@ nf.ProcessorDetails = (function () { nf.Common.clearField('read-only-yield-duration'); nf.Common.clearField('read-only-run-duration'); nf.Common.clearField('read-only-bulletin-level'); + nf.Common.clearField('read-only-execution-node'); nf.Common.clearField('read-only-execution-status'); nf.Common.clearField('read-only-processor-comments'); @@ -159,6 +160,7 @@ nf.ProcessorDetails = (function () { // make the scheduling strategy human readable var schedulingStrategy = details.config['schedulingStrategy']; + var executionNode = details.config['executionNode']; if (schedulingStrategy === 'EVENT_DRIVEN') { showRunSchedule = false; schedulingStrategy = 'Event driven'; @@ -166,8 +168,12 @@ nf.ProcessorDetails = (function () { schedulingStrategy = 'CRON driven'; } else if (schedulingStrategy === 'TIMER_DRIVEN') { schedulingStrategy = "Timer driven"; - } else { - schedulingStrategy = "On primary node"; + } else if (schedulingStrategy === 'PRIMARY_NODE_ONLY') { + // PRIMARY_NODE_ONLY has been deprecated as a + // schedulingStrategy option, and is now an + // executionNode option instead. + schedulingStrategy = "Timer driven"; + executionNode = 'PRIMARY' } nf.Common.populateField('read-only-scheduling-strategy', schedulingStrategy); @@ -178,6 +184,19 @@ nf.ProcessorDetails = (function () { $('#read-only-run-schedule').hide(); } + // only show the execution-node when applicable + if (executionNode === 'ALL') { + executionNode = "All nodes"; + } else if (executionNode === 'PRIMARY') { + executionNode = "Primary node only"; + } + nf.Common.populateField('read-only-execution-node', executionNode); + if (nf.Canvas.isClustered()) { + $('#details-execution-node-container').show(); + } else { + $('#details-execution-node-container').hide(); + } + // load the relationship list if (!nf.Common.isEmpty(details.relationships)) { $.each(details.relationships, function (i, relationship) { @@ -271,4 +290,4 @@ nf.ProcessorDetails = (function () { }); } }; -}()); \ No newline at end of file +}());
