http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java index 5acba8d..ea7cb96 100644 --- a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestStandardPreparedQuery.java @@ -17,9 +17,13 @@ package org.apache.nifi.attribute.expression.language; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.junit.Ignore; @@ -46,6 +50,7 @@ public class TestStandardPreparedQuery { } @Test + @Ignore("Intended for manual performance testing; should not be run in an automated environment") public void test10MIterations() { final Map<String, String> attrs = new HashMap<>(); attrs.put("xx", "world"); @@ -84,6 +89,42 @@ public class TestStandardPreparedQuery { } + @Test + public void testVariableImpacted() { + final Set<String> attr = new HashSet<>(); + attr.add("attr"); + + final Set<String> attr2 = new HashSet<>(); + attr2.add("attr"); + attr2.add("attr2"); + + final Set<String> abc = new HashSet<>(); + abc.add("a"); + abc.add("b"); + abc.add("c"); + + assertTrue(Query.prepare("${attr}").getVariableImpact().isImpacted("attr")); + assertFalse(Query.prepare("${attr}").getVariableImpact().isImpacted("attr2")); + assertTrue(Query.prepare("${attr:trim():toUpper():equals('abc')}").getVariableImpact().isImpacted("attr")); + + assertFalse(Query.prepare("${anyAttribute('a', 'b', 'c'):equals('hello')}").getVariableImpact().isImpacted("attr")); + assertTrue(Query.prepare("${anyAttribute('a', 'b', 'c'):equals('hello')}").getVariableImpact().isImpacted("a")); + assertTrue(Query.prepare("${anyAttribute('a', 'b', 'c'):equals('hello')}").getVariableImpact().isImpacted("b")); + assertTrue(Query.prepare("${anyAttribute('a', 'b', 'c'):equals('hello')}").getVariableImpact().isImpacted("c")); + + assertFalse(Query.prepare("${allAttributes('a', 'b', 'c'):equals('hello')}").getVariableImpact().isImpacted("attr")); + assertTrue(Query.prepare("${allAttributes('a', 'b', 'c'):equals('hello')}").getVariableImpact().isImpacted("a")); + assertTrue(Query.prepare("${allAttributes('a', 'b', 'c'):equals('hello')}").getVariableImpact().isImpacted("b")); + assertTrue(Query.prepare("${allAttributes('a', 'b', 'c'):equals('hello')}").getVariableImpact().isImpacted("c")); + + assertTrue(Query.prepare("${attr:equals('${attr2}')}").getVariableImpact().isImpacted("attr")); + assertTrue(Query.prepare("${attr:equals('${attr2}')}").getVariableImpact().isImpacted("attr2")); + assertFalse(Query.prepare("${attr:equals('${attr2}')}").getVariableImpact().isImpacted("attr3")); + + assertTrue(Query.prepare("${allMatchingAttributes('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr")); + assertTrue(Query.prepare("${anyMatchingAttribute('a.*'):equals('hello')}").getVariableImpact().isImpacted("attr")); + } + private String evaluate(final String query, final Map<String, String> attrs) { final String evaluated = ((StandardPreparedQuery) Query.prepare(query)).evaluateExpressions(attrs, null); return evaluated;
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestValueLookup.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestValueLookup.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestValueLookup.java index ec8ba66..1010ac7 100644 --- a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestValueLookup.java +++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestValueLookup.java @@ -25,12 +25,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; -/** - * - */ public class TestValueLookup { @Test + @SuppressWarnings("unchecked") public void testCreateCustomVariableRegistry() { final VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java new file mode 100644 index 0000000..5d631ed --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import javax.xml.bind.annotation.XmlType; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlType(name = "affectedComponent") +public class AffectedComponentDTO { + public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR"; + public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE"; + + private String parentGroupId; + private String componentId; + private String componentType; + + @ApiModelProperty("The UUID of the Process Group that this component is in") + public String getParentGroupId() { + return parentGroupId; + } + + public void setParentGroupId(final String parentGroupId) { + this.parentGroupId = parentGroupId; + } + + @ApiModelProperty("The UUID of this component") + public String getComponentId() { + return componentId; + } + + public void setComponentId(final String componentId) { + this.componentId = componentId; + } + + @ApiModelProperty(value = "The type of this component", allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE) + public String getComponentType() { + return componentType; + } + + public void setComponentType(final String componentType) { + this.componentType = componentType; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java index 42d605d..f4b6f31 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java @@ -17,6 +17,9 @@ package org.apache.nifi.web.api.dto; import com.wordnik.swagger.annotations.ApiModelProperty; + +import java.util.Map; + import javax.xml.bind.annotation.XmlType; /** @@ -27,6 +30,7 @@ public class ProcessGroupDTO extends ComponentDTO { private String name; private String comments; + private Map<String, String> variables; private Integer runningCount; private Integer stoppedCount; @@ -200,4 +204,16 @@ public class ProcessGroupDTO extends ComponentDTO { this.inactiveRemotePortCount = inactiveRemotePortCount; } + + @ApiModelProperty(value = "The variables that are configured for the Process Group. Note that this map contains only " + + "those variables that are defined on this Process Group and not any variables that are defined in the parent " + + "Process Group, etc. I.e., this Map will not contain all variables that are accessible by components in this " + + "Process Group by rather only the variables that are defined for this Process Group itself.", readOnly = true) + public Map<String, String> getVariables() { + return variables; + } + + public void setVariables(final Map<String, String> variables) { + this.variables = variables; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableDTO.java new file mode 100644 index 0000000..c686316 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableDTO.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import java.util.HashSet; +import java.util.Set; + +import javax.xml.bind.annotation.XmlType; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlType(name = "variable") +public class VariableDTO { + private String name; + private String value; + private String processGroupId; + private Set<AffectedComponentDTO> affectedComponents = new HashSet<>(); + + @ApiModelProperty("The name of the variable") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @ApiModelProperty("The value of the variable") + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + @ApiModelProperty(value = "The ID of the Process Group where this Variable is defined", readOnly = true) + public String getProcessGroupId() { + return processGroupId; + } + + public void setProcessGroupId(String groupId) { + this.processGroupId = groupId; + } + + @ApiModelProperty(value = "A set of all components that will be affected if the value of this variable is changed", readOnly = true) + public Set<AffectedComponentDTO> getAffectedComponents() { + return affectedComponents; + } + + public void setAffectedComponents(Set<AffectedComponentDTO> affectedComponents) { + this.affectedComponents = affectedComponents; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryDTO.java new file mode 100644 index 0000000..c106a9a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryDTO.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import java.util.Set; + +import javax.xml.bind.annotation.XmlType; + +import org.apache.nifi.web.api.entity.VariableEntity; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlType(name = "variableRegistry") +public class VariableRegistryDTO { + private Set<VariableEntity> variables; + private String groupId; + + public void setVariables(final Set<VariableEntity> variables) { + this.variables = variables; + } + + @ApiModelProperty("The variables that are available in this Variable Registry") + public Set<VariableEntity> getVariables() { + return variables; + } + + public void setProcessGroupId(final String groupId) { + this.groupId = groupId; + } + + @ApiModelProperty("The UUID of the Process Group that this Variable Registry belongs to") + public String getProcessGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateRequestDTO.java new file mode 100644 index 0000000..06a0dc2 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateRequestDTO.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import java.util.Date; +import java.util.List; + +import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.web.api.dto.util.TimestampAdapter; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlType(name = "variableRegistryUpdateRequest") +public class VariableRegistryUpdateRequestDTO { + private String requestId; + private String processGroupId; + private String uri; + private Date submissionTime = new Date(); + private Date lastUpdated = new Date(); + private boolean complete = false; + private String failureReason; + private List<VariableRegistryUpdateStepDTO> updateSteps; + + + @ApiModelProperty("The unique ID of the Process Group that the variable registry belongs to") + public String getProcessGroupId() { + return processGroupId; + } + + public void setProcessGroupId(String processGroupId) { + this.processGroupId = processGroupId; + } + + @ApiModelProperty(value = "The unique ID of this request.", readOnly = true) + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + @ApiModelProperty(value = "The URI for future requests to this drop request.", readOnly = true) + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty(value = "The time at which this request was submitted.", dataType = "string", readOnly = true) + public Date getSubmissionTime() { + return submissionTime; + } + + public void setSubmissionTime(Date submissionTime) { + this.submissionTime = submissionTime; + } + + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty(value = "The last time this request was updated.", dataType = "string", readOnly = true) + public Date getLastUpdated() { + return lastUpdated; + } + + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + @ApiModelProperty(value = "The steps that are required in order to complete the request, along with the status of each", readOnly = true) + public List<VariableRegistryUpdateStepDTO> getUpdateSteps() { + return updateSteps; + } + + public void setUpdateSteps(List<VariableRegistryUpdateStepDTO> updateSteps) { + this.updateSteps = updateSteps; + } + + @ApiModelProperty(value = "Whether or not this request has completed", readOnly = true) + public boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + + @ApiModelProperty(value = "An explanation of why this request failed, or null if this request has not failed", readOnly = true) + public String getFailureReason() { + return failureReason; + } + + public void setFailureReason(String reason) { + this.failureReason = reason; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateStepDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateStepDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateStepDTO.java new file mode 100644 index 0000000..e1c8cee --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VariableRegistryUpdateStepDTO.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import javax.xml.bind.annotation.XmlType; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlType(name = "varaibleRegistryUpdateStep") +public class VariableRegistryUpdateStepDTO { + private String description; + private boolean complete; + private String failureReason; + + public VariableRegistryUpdateStepDTO() { + } + + @ApiModelProperty(value = "Explanation of what happens in this step", readOnly = true) + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + @ApiModelProperty(value = "Whether or not this step has completed", readOnly = true) + public boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + + @ApiModelProperty(value = "An explanation of why this step failed, or null if this step did not fail", readOnly = true) + public String getFailureReason() { + return failureReason; + } + + public void setFailureReason(String reason) { + this.failureReason = reason; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ActivateControllerServicesEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ActivateControllerServicesEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ActivateControllerServicesEntity.java new file mode 100644 index 0000000..a58f821 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ActivateControllerServicesEntity.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import java.util.Map; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.web.api.dto.RevisionDTO; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlRootElement(name = "activateControllerServicesEntity") +public class ActivateControllerServicesEntity extends Entity { + public static final String STATE_ENABLED = "ENABLED"; + public static final String STATE_DISABLED = "DISABLED"; + + private String id; + private String state; + private Map<String, RevisionDTO> components; + + @ApiModelProperty("The id of the ProcessGroup") + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + /** + * @return The desired state of the descendant components. Possible states are 'RUNNING' and 'STOPPED' + */ + @ApiModelProperty(value = "The desired state of the descendant components", + allowableValues = STATE_ENABLED + ", " + STATE_DISABLED) + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + @ApiModelProperty("Optional services to schedule. If not specified, all authorized descendant controller services will be used.") + public Map<String, RevisionDTO> getComponents() { + return components; + } + + public void setComponents(Map<String, RevisionDTO> components) { + this.components = components; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java index 9aeef40..3e4c9a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java @@ -16,17 +16,21 @@ */ package org.apache.nifi.web.api.entity; -import com.wordnik.swagger.annotations.ApiModelProperty; -import org.apache.nifi.web.api.dto.RevisionDTO; +import java.util.Map; import javax.xml.bind.annotation.XmlRootElement; -import java.util.Map; + +import org.apache.nifi.web.api.dto.RevisionDTO; + +import com.wordnik.swagger.annotations.ApiModelProperty; /** * A serialized representation of this class can be placed in the entity body of a request to the API. */ @XmlRootElement(name = "scheduleComponentEntity") public class ScheduleComponentsEntity extends Entity { + public static final String STATE_RUNNING = "RUNNING"; + public static final String STATE_STOPPED = "STOPPED"; private String id; private String state; @@ -51,7 +55,7 @@ public class ScheduleComponentsEntity extends Entity { */ @ApiModelProperty( value = "The desired state of the descendant components", - allowableValues = "RUNNING, STOPPED" + allowableValues = STATE_RUNNING + ", " + STATE_STOPPED ) public String getState() { return state; http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableEntity.java new file mode 100644 index 0000000..06f6fcf --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableEntity.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.web.api.dto.VariableDTO; +import org.apache.nifi.web.api.dto.WritablePermission; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlRootElement(name = "variableEntity") +public class VariableEntity extends Entity implements WritablePermission { + private VariableDTO variable; + private Boolean canWrite; + + @Override + @ApiModelProperty(value = "Indicates whether the user can write a given resource.", readOnly = true) + public Boolean getCanWrite() { + return canWrite; + } + + @Override + public void setCanWrite(Boolean canWrite) { + this.canWrite = canWrite; + } + + @ApiModelProperty("The variable information") + public VariableDTO getVariable() { + return variable; + } + + public void setVariable(VariableDTO variable) { + this.variable = variable; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryEntity.java new file mode 100644 index 0000000..d876453 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryEntity.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlRootElement(name = "variableRegistryEntity") +public class VariableRegistryEntity extends Entity { + private RevisionDTO groupRevision; + private VariableRegistryDTO variableRegistry; + + + @ApiModelProperty("The Variable Registry.") + public VariableRegistryDTO getVariableRegistry() { + return variableRegistry; + } + + public void setVariableRegistry(VariableRegistryDTO variableRegistry) { + this.variableRegistry = variableRegistry; + } + + @ApiModelProperty("The revision of the Process Group that the Variable Registry belongs to") + public RevisionDTO getProcessGroupRevision() { + return groupRevision; + } + + public void setProcessGroupRevision(RevisionDTO revision) { + this.groupRevision = revision; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryUpdateRequestEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryUpdateRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryUpdateRequestEntity.java new file mode 100644 index 0000000..77257af --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VariableRegistryUpdateRequestEntity.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VariableRegistryUpdateRequestDTO; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +@XmlRootElement(name = "variableRegistryUpdateRequestEntity") +public class VariableRegistryUpdateRequestEntity extends Entity { + private VariableRegistryUpdateRequestDTO requestDto; + private RevisionDTO processGroupRevision; + + @ApiModelProperty("The revision for the Process Group that owns this variable registry.") + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(RevisionDTO revision) { + this.processGroupRevision = revision; + } + + @ApiModelProperty("The Variable Registry Update Request") + public VariableRegistryUpdateRequestDTO getRequestDto() { + return requestDto; + } + + public void setRequestDto(VariableRegistryUpdateRequestDTO requestDto) { + this.requestDto = requestDto; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index 2fc55a4..c102746 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -16,6 +16,16 @@ */ package org.apache.nifi.cluster.coordination.http; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.ws.rs.core.StreamingOutput; + import org.apache.nifi.cluster.coordination.http.endpoints.AccessPolicyEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger; @@ -77,15 +87,6 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.StreamingOutput; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - public class StandardHttpResponseMapper implements HttpResponseMapper { private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 45c6ead..ff5a8af 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -16,24 +16,6 @@ */ package org.apache.nifi.controller; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.attribute.expression.language.StandardPropertyValue; -import org.apache.nifi.bundle.Bundle; -import org.apache.nifi.bundle.BundleCoordinate; -import org.apache.nifi.components.ConfigurableComponent; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.util.CharacterFilterUtils; -import org.apache.nifi.util.file.classloader.ClassLoaderUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; @@ -53,6 +35,24 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.util.CharacterFilterUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent { private static final Logger logger = LoggerFactory.getLogger(AbstractConfiguredComponent.class); @@ -64,7 +64,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone private final AtomicReference<ValidationContext> validationContext = new AtomicReference<>(); private final String componentType; private final String componentCanonicalClass; - private final VariableRegistry variableRegistry; + private final ComponentVariableRegistry variableRegistry; private final ReloadComponent reloadComponent; private final AtomicBoolean isExtensionMissing; @@ -74,7 +74,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone public AbstractConfiguredComponent(final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, + final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { this.id = id; this.validationContextFactory = validationContextFactory; @@ -541,7 +541,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } } - protected VariableRegistry getVariableRegistry() { + public ComponentVariableRegistry getVariableRegistry() { return this.variableRegistry; } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java index 069e6ce..940ac21 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java @@ -29,6 +29,7 @@ import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.registry.ComponentVariableRegistry; import java.net.URL; import java.util.Collection; @@ -38,6 +39,7 @@ import java.util.Set; public interface ConfiguredComponent extends ComponentAuthorizable { + @Override public String getIdentifier(); public String getName(); @@ -99,6 +101,11 @@ public interface ConfiguredComponent extends ComponentAuthorizable { */ boolean isDeprecated(); + /** + * @return the variable registry for this component + */ + ComponentVariableRegistry getVariableRegistry(); + @Override default AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) { // if this is a modification request and the reporting task is restricted ensure the user has elevated privileges. if this http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index 5bb8981..c6f30b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; @@ -41,7 +42,7 @@ public interface ProcessScheduler { * @param procNode to start * @throws IllegalStateException if the Processor is disabled */ - void startProcessor(ProcessorNode procNode); + Future<Void> startProcessor(ProcessorNode procNode); /** * Stops scheduling the given processor to run and invokes all methods on @@ -52,7 +53,7 @@ public interface ProcessScheduler { * * @param procNode to stop */ - void stopProcessor(ProcessorNode procNode); + Future<Void> stopProcessor(ProcessorNode procNode); /** * Starts scheduling the given Port to run. If the Port is already scheduled @@ -169,12 +170,12 @@ public interface ProcessScheduler { * Disables all of the given Controller Services in the order provided by the List * @param services the controller services to disable */ - void disableControllerServices(List<ControllerServiceNode> services); + CompletableFuture<Void> disableControllerServices(List<ControllerServiceNode> services); /** * Disables the Controller Service so that it can be updated * * @param service to disable */ - void disableControllerService(ControllerServiceNode service); + CompletableFuture<Void> disableControllerService(ControllerServiceNode service); } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index aac5e52..ba2e59b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.controller; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.scheduling.ScheduleState; @@ -26,18 +33,12 @@ import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { private static final Logger logger = LoggerFactory.getLogger(ProcessorNode.class); @@ -46,7 +47,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen public ProcessorNode(final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, + final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing); this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED); @@ -185,7 +186,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen * the ScheduleState that can be used to ensure that the running state (STOPPED, RUNNING, etc.) * as well as the active thread counts are kept in sync */ - public abstract <T extends ProcessContext & ControllerServiceLookup> void stop(ScheduledExecutorService scheduler, + public abstract <T extends ProcessContext & ControllerServiceLookup> CompletableFuture<Void> stop(ScheduledExecutorService scheduler, T processContext, SchedulingAgent schedulingAgent, ScheduleState scheduleState); /** http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index faf530f..3dd1076 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -108,7 +108,7 @@ public interface ControllerServiceNode extends ConfiguredComponent { * implementation of {@link ScheduledExecutorService} used to * initiate service disabling task */ - void disable(ScheduledExecutorService scheduler); + CompletableFuture<Void> disable(ScheduledExecutorService scheduler); /** * @return the ControllerServiceReference that describes which components are referencing this Controller Service http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index f7ba5e5..010ecdf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -19,7 +19,7 @@ package org.apache.nifi.controller.service; import java.net.URL; import java.util.Collection; import java.util.Set; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.bundle.BundleCoordinate; @@ -72,7 +72,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * @param serviceNode the service node * @return a Future that can be used to wait for the service to finish being enabled. */ - Future<Void> enableControllerService(ControllerServiceNode serviceNode); + CompletableFuture<Void> enableControllerService(ControllerServiceNode serviceNode); /** * Enables the collection of services. If a service in this collection @@ -90,7 +90,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @param serviceNode the node */ - void disableControllerService(ControllerServiceNode serviceNode); + CompletableFuture<Void> disableControllerService(ControllerServiceNode serviceNode); /** * @return a Set of all Controller Services that exist for this service http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java index df18c62..13c5844 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.service; +import java.util.List; import java.util.Set; import org.apache.nifi.controller.ConfiguredComponent; @@ -43,4 +44,13 @@ public interface ControllerServiceReference { * Controller Services) */ Set<ConfiguredComponent> getActiveReferences(); + + /** + * Returns a List of all components that reference this Controller Service (recursively) that + * are of the given type + * + * @param componentType the type of component that is desirable + * @return a List of all components that reference this Controller Service that are of the given type + */ + <T> List<T> findRecursiveReferences(Class<T> componentType); } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 122e454..bf789f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -16,12 +16,20 @@ */ package org.apache.nifi.groups; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; + import org.apache.nifi.authorization.resource.ComponentAuthorizable; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Positionable; +import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; @@ -30,13 +38,9 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.Processor; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.function.Predicate; - /** * <p> * ProcessGroup objects are containers for processing entities, such as @@ -84,6 +88,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { /** * @return the ID of the ProcessGroup */ + @Override String getIdentifier(); /** @@ -159,7 +164,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { * @throws IllegalStateException if the processor is not valid, or is * already running */ - void startProcessor(ProcessorNode processor); + CompletableFuture<Void> startProcessor(ProcessorNode processor); /** * Starts the given Input Port @@ -187,7 +192,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { * * @param processor to stop */ - void stopProcessor(ProcessorNode processor); + CompletableFuture<Void> stopProcessor(ProcessorNode processor); /** * Stops the given Port @@ -814,6 +819,15 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup); /** + * Ensures that the given variables can be updated + * + * @param updatedVariables the new set of variable names and values + * + * @throws IllegalStateException if one or more variables that are listed cannot be updated at this time + */ + void verifyCanUpdateVariables(Map<String, String> updatedVariables); + + /** * Adds the given template to this Process Group * * @param template the template to add @@ -853,4 +867,27 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable { * @return a Set of all Templates that belong to this Process Group and any descendant Process Groups */ Set<Template> findAllTemplates(); + + /** + * Updates the variables that are provided by this Process Group + * + * @param variables the variables to provide + * @throws IllegalStateException if the Process Group is not in a state that allows the variables to be updated + */ + void setVariables(Map<String, String> variables); + + /** + * Returns the Variable Registry for this Process Group + * + * @return the Variable Registry for this Process Group + */ + ComponentVariableRegistry getVariableRegistry(); + + /** + * Returns a set of all components that are affected by the variable with the given name + * + * @param variableName the name of the variable + * @return a set of all components that are affected by the variable with the given name + */ + Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName); } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2e351a3..94645cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -34,9 +34,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -195,7 +195,10 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.variable.MutableVariableRegistry; +import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RemoteResourceManager; @@ -495,15 +498,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.variableRegistry, this.nifiProperties); + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.nifiProperties); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( - eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, this.variableRegistry)); + eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); - final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); - final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties); + final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); + final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.nifiProperties); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); // PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been re-configured with executeNode = PRIMARY). processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); @@ -540,7 +543,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.snippetManager = new SnippetManager(); final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler, - nifiProperties, encryptor, this, this.variableRegistry); + nifiProperties, encryptor, this, new MutableVariableRegistry(this.variableRegistry)); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); setRootGroup(rootGroup); instanceId = ComponentIdGenerator.generateId().toString(); @@ -1022,7 +1025,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws NullPointerException if the argument is null */ public ProcessGroup createProcessGroup(final String id) { - return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, nifiProperties, encryptor, this, variableRegistry); + return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, nifiProperties, encryptor, this, new MutableVariableRegistry(variableRegistry)); } /** @@ -1099,15 +1102,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R creationSuccessful = false; } - final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); + final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVarRegistry); final ProcessorNode procNode; if (creationSuccessful) { - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry, this); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, + nifiProperties, componentVarRegistry, this); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - procNode = new StandardProcessorNode( - processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, this, true); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, + componentType, type, nifiProperties, componentVarRegistry, this, true); } final LogRepository logRepository = LogRepositoryFactory.getRepository(id); @@ -1223,7 +1228,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // call OnRemoved for the existing processor using the previous instance class loader try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) { final StandardProcessContext processContext = new StandardProcessContext( - existingNode, controllerServiceProvider, encryptor, getStateManagerProvider().getStateManager(id), variableRegistry); + existingNode, controllerServiceProvider, encryptor, getStateManagerProvider().getStateManager(id)); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext); } finally { ExtensionManager.closeURLClassLoader(id, existingInstanceClassLoader); @@ -1943,6 +1948,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R childGroup.setPosition(toPosition(groupDTO.getPosition())); childGroup.setComments(groupDTO.getComments()); childGroup.setName(groupDTO.getName()); + if (groupDTO.getVariables() != null) { + childGroup.setVariables(groupDTO.getVariables()); + } + group.addProcessGroup(childGroup); final FlowSnippetDTO contents = groupDTO.getContents(); @@ -3124,15 +3133,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R creationSuccessful = false; } - final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); + final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVarRegistry); final ReportingTaskNode taskNode; if (creationSuccessful) { - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry, this); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentVarRegistry, this); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, this, true); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, componentVarRegistry, this, true); } taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName()); @@ -3395,7 +3405,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public Future<Void> enableControllerService(final ControllerServiceNode serviceNode) { + public CompletableFuture<Void> enableControllerService(final ControllerServiceNode serviceNode) { return controllerServiceProvider.enableControllerService(serviceNode); } @@ -3405,9 +3415,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public void disableControllerService(final ControllerServiceNode serviceNode) { + public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanDisable(); - controllerServiceProvider.disableControllerService(serviceNode); + return controllerServiceProvider.disableControllerService(serviceNode); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 01dd35e..58bb90f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -16,6 +16,34 @@ */ package org.apache.nifi.controller; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; + +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.Authorizer; @@ -92,33 +120,6 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; -import javax.xml.XMLConstants; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.validation.Schema; -import javax.xml.validation.SchemaFactory; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; - /** */ public class StandardFlowSynchronizer implements FlowSynchronizer { @@ -1037,6 +1038,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { parentGroup.addProcessGroup(processGroup); } + // Set the variables for the variable registry + final Map<String, String> variables = new HashMap<>(); + final List<Element> variableElements = getChildrenByTagName(processGroupElement, "variable"); + for (final Element variableElement : variableElements) { + final String variableName = variableElement.getAttribute("name"); + final String variableValue = variableElement.getAttribute("value"); + if (variableName == null || variableValue == null) { + continue; + } + + variables.put(variableName, variableValue); + } + + processGroup.setVariables(variables); + // Add Controller Services final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); if (!serviceNodeList.isEmpty()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 1ff09d7..36cb62e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -16,6 +16,31 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -56,7 +81,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; -import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.CharacterFilterUtils; @@ -68,30 +93,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; -import java.lang.reflect.InvocationTargetException; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import static java.util.Objects.requireNonNull; - /** * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists * within a controlled flow. This node keeps track of the processor, its @@ -137,7 +138,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties, - final VariableRegistry variableRegistry, final ReloadComponent reloadComponent) { + final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) { this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, processor.getComponent().getClass().getSimpleName(), processor.getComponent().getClass().getCanonicalName(), nifiProperties, variableRegistry, reloadComponent, false); @@ -147,7 +148,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider, final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties, - final VariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { + final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { super(uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing); @@ -1372,13 +1373,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * </p> */ @Override - public <T extends ProcessContext & ControllerServiceLookup> void stop(final ScheduledExecutorService scheduler, + public <T extends ProcessContext & ControllerServiceLookup> CompletableFuture<Void> stop(final ScheduledExecutorService scheduler, final T processContext, final SchedulingAgent schedulingAgent, final ScheduleState scheduleState) { final Processor processor = processorRef.get().getProcessor(); LOG.info("Stopping processor: " + processor.getClass()); - + final CompletableFuture<Void> future = new CompletableFuture<>(); if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once scheduleState.incrementActiveThreadCount(); @@ -1405,6 +1406,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable scheduleState.decrementActiveThreadCount(); scheduledState.set(ScheduledState.STOPPED); + future.complete(null); } else { // Not all of the active threads have finished. Try again in 100 milliseconds. scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); @@ -1415,16 +1417,17 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } }); } else { - /* - * We do compareAndSet() instead of set() to ensure that Processor - * stoppage is handled consistently including a condition where - * Processor never got a chance to transition to RUNNING state - * before stop() was called. If that happens the stop processor - * routine will be initiated in start() method, otherwise the IF - * part will handle the stop processor routine. - */ + // We do compareAndSet() instead of set() to ensure that Processor + // stoppage is handled consistently including a condition where + // Processor never got a chance to transition to RUNNING state + // before stop() was called. If that happens the stop processor + // routine will be initiated in start() method, otherwise the IF + // part will handle the stop processor routine. this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING); + future.complete(null); } + + return future; } /**
