http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java index 965976a..db68824 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java @@ -58,6 +58,11 @@ public enum DisconnectionCode { MISMATCHED_FLOWS("Node's Flow did not Match Cluster Flow"), /** + * The node was missing a bundle used the cluster flow. + */ + MISSING_BUNDLE("Node was missing bundle used by Cluster Flow"), + + /** * Cannot communicate with the node */ UNABLE_TO_COMMUNICATE("Unable to Communicate with Node"),
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java index 6c5c1ce..7822db7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java @@ -18,6 +18,9 @@ package org.apache.nifi.cluster.protocol; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; @@ -34,6 +37,7 @@ public class StandardDataFlow implements Serializable, DataFlow { private final byte[] flow; private final byte[] snippetBytes; private final byte[] authorizerFingerprint; + private final Set<String> missingComponentIds; /** * Constructs an instance. @@ -41,22 +45,27 @@ public class StandardDataFlow implements Serializable, DataFlow { * @param flow a valid flow as bytes, which cannot be null * @param snippetBytes an XML representation of snippets. May be null. * @param authorizerFingerprint the bytes of the Authorizer's fingerprint. May be null when using an external Authorizer. + * @param missingComponentIds the ids of components that were created as missing ghost components * * @throws NullPointerException if flow is null */ - public StandardDataFlow(final byte[] flow, final byte[] snippetBytes, final byte[] authorizerFingerprint) { + public StandardDataFlow(final byte[] flow, final byte[] snippetBytes, final byte[] authorizerFingerprint, final Set<String> missingComponentIds) { if(flow == null){ throw new NullPointerException("Flow cannot be null"); } this.flow = flow; this.snippetBytes = snippetBytes; this.authorizerFingerprint = authorizerFingerprint; + this.missingComponentIds = Collections.unmodifiableSet(missingComponentIds == null + ? new HashSet<>() : new HashSet<>(missingComponentIds)); } public StandardDataFlow(final DataFlow toCopy) { this.flow = copy(toCopy.getFlow()); this.snippetBytes = copy(toCopy.getSnippets()); this.authorizerFingerprint = copy(toCopy.getAuthorizerFingerprint()); + this.missingComponentIds = Collections.unmodifiableSet(toCopy.getMissingComponents() == null + ? new HashSet<>() : new HashSet<>(toCopy.getMissingComponents())); } private static byte[] copy(final byte[] bytes) { @@ -79,4 +88,9 @@ public class StandardDataFlow implements Serializable, DataFlow { return authorizerFingerprint; } + @Override + public Set<String> getMissingComponents() { + return missingComponentIds; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java index bfdfe3e..738fa86 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.protocol.jaxb.message; +import java.util.Set; + /** */ public class AdaptedDataFlow { @@ -23,6 +25,7 @@ public class AdaptedDataFlow { private byte[] flow; private byte[] snippets; private byte[] authorizerFingerprint; + private Set<String> missingComponents; public byte[] getFlow() { return flow; @@ -48,4 +51,12 @@ public class AdaptedDataFlow { this.authorizerFingerprint = authorizerFingerprint; } + public Set<String> getMissingComponents() { + return missingComponents; + } + + public void setMissingComponents(Set<String> missingComponents) { + this.missingComponents = missingComponents; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java index 054d56e..d31e8e5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java @@ -33,6 +33,7 @@ public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, DataFlow> { aDf.setFlow(df.getFlow()); aDf.setSnippets(df.getSnippets()); aDf.setAuthorizerFingerprint(df.getAuthorizerFingerprint()); + aDf.setMissingComponents(df.getMissingComponents()); } return aDf; @@ -40,7 +41,7 @@ public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, DataFlow> { @Override public DataFlow unmarshal(final AdaptedDataFlow aDf) { - final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getSnippets(), aDf.getAuthorizerFingerprint()); + final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getSnippets(), aDf.getAuthorizerFingerprint(), aDf.getMissingComponents()); return dataFlow; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java index 8c2cca6..7ced87e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java @@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.stream.IntStream; @@ -57,7 +58,7 @@ public class TestJaxbProtocolUtils { final ConnectionResponseMessage msg = new ConnectionResponseMessage(); final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, true); - final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]); + final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()); final List<NodeConnectionStatus> nodeStatuses = Collections.singletonList(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)); final List<ComponentRevision> componentRevisions = Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, "client-1", "component-1"))); msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, "instance-1", nodeStatuses, componentRevisions)); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index 68719fe..bbaeb26 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -26,6 +26,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurati import org.apache.nifi.cluster.coordination.http.endpoints.ControllerEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceTypesEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger; @@ -44,10 +45,12 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpoint import org.apache.nifi.cluster.coordination.http.endpoints.OutputPortsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.PrioritizerTypesEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorTypesEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger; @@ -55,6 +58,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEnd import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskTypesEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger; @@ -124,6 +128,10 @@ public class StandardHttpResponseMapper implements HttpResponseMapper { endpointMergers.add(new SystemDiagnosticsEndpointMerger()); endpointMergers.add(new CountersEndpointMerger()); endpointMergers.add(new FlowMerger()); + endpointMergers.add(new ProcessorTypesEndpointMerger()); + endpointMergers.add(new ControllerServiceTypesEndpointMerger()); + endpointMergers.add(new ReportingTaskTypesEndpointMerger()); + endpointMergers.add(new PrioritizerTypesEndpointMerger()); endpointMergers.add(new ControllerConfigurationEndpointMerger()); endpointMergers.add(new CurrentUserEndpointMerger()); endpointMergers.add(new FlowConfigurationEndpointMerger()); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceTypesEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceTypesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceTypesEndpointMerger.java new file mode 100644 index 0000000..b0415a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceTypesEndpointMerger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.DocumentedTypesMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity; + +import java.net.URI; +import java.util.Map; +import java.util.Set; + +public class ControllerServiceTypesEndpointMerger extends AbstractNodeStatusEndpoint<ControllerServiceTypesEntity, Set<DocumentedTypeDTO>> { + public static final String CONTROLLER_SERVICE_TYPES_URI_PATTERN = "/nifi-api/flow/controller-service-types"; + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICE_TYPES_URI_PATTERN.equals(uri.getPath()); + } + + @Override + protected Class<ControllerServiceTypesEntity> getEntityClass() { + return ControllerServiceTypesEntity.class; + } + + @Override + protected Set<DocumentedTypeDTO> getDto(ControllerServiceTypesEntity entity) { + return entity.getControllerServiceTypes(); + } + + @Override + protected void mergeResponses(Set<DocumentedTypeDTO> clientDto, Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap, NodeIdentifier selectedNodeId) { + DocumentedTypesMerger.mergeDocumentedTypes(clientDto, dtoMap); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PrioritizerTypesEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PrioritizerTypesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PrioritizerTypesEndpointMerger.java new file mode 100644 index 0000000..9a9e7a1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PrioritizerTypesEndpointMerger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.DocumentedTypesMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.entity.PrioritizerTypesEntity; + +import java.net.URI; +import java.util.Map; +import java.util.Set; + +public class PrioritizerTypesEndpointMerger extends AbstractNodeStatusEndpoint<PrioritizerTypesEntity, Set<DocumentedTypeDTO>> { + public static final String PRIORITIZER_TYPES_URI_PATTERN = "/nifi-api/flow/prioritizers"; + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && PRIORITIZER_TYPES_URI_PATTERN.equals(uri.getPath()); + } + + @Override + protected Class<PrioritizerTypesEntity> getEntityClass() { + return PrioritizerTypesEntity.class; + } + + @Override + protected Set<DocumentedTypeDTO> getDto(PrioritizerTypesEntity entity) { + return entity.getPrioritizerTypes(); + } + + @Override + protected void mergeResponses(Set<DocumentedTypeDTO> clientDto, Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap, NodeIdentifier selectedNodeId) { + DocumentedTypesMerger.mergeDocumentedTypes(clientDto, dtoMap); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorTypesEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorTypesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorTypesEndpointMerger.java new file mode 100644 index 0000000..b95f008 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorTypesEndpointMerger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.DocumentedTypesMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.entity.ProcessorTypesEntity; + +import java.net.URI; +import java.util.Map; +import java.util.Set; + +public class ProcessorTypesEndpointMerger extends AbstractNodeStatusEndpoint<ProcessorTypesEntity, Set<DocumentedTypeDTO>> { + public static final String PROCESSOR_TYPES_URI_PATTERN = "/nifi-api/flow/processor-types"; + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && PROCESSOR_TYPES_URI_PATTERN.equals(uri.getPath()); + } + + @Override + protected Class<ProcessorTypesEntity> getEntityClass() { + return ProcessorTypesEntity.class; + } + + @Override + protected Set<DocumentedTypeDTO> getDto(ProcessorTypesEntity entity) { + return entity.getProcessorTypes(); + } + + @Override + protected void mergeResponses(Set<DocumentedTypeDTO> clientDto, Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap, NodeIdentifier selectedNodeId) { + DocumentedTypesMerger.mergeDocumentedTypes(clientDto, dtoMap); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskTypesEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskTypesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskTypesEndpointMerger.java new file mode 100644 index 0000000..4abb6b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskTypesEndpointMerger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.DocumentedTypesMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity; + +import java.net.URI; +import java.util.Map; +import java.util.Set; + +public class ReportingTaskTypesEndpointMerger extends AbstractNodeStatusEndpoint<ReportingTaskTypesEntity, Set<DocumentedTypeDTO>> { + public static final String REPORTING_TASK_TYPES_URI_PATTERN = "/nifi-api/flow/reporting-task-types"; + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && REPORTING_TASK_TYPES_URI_PATTERN.equals(uri.getPath()); + } + + @Override + protected Class<ReportingTaskTypesEntity> getEntityClass() { + return ReportingTaskTypesEntity.class; + } + + @Override + protected Set<DocumentedTypeDTO> getDto(ReportingTaskTypesEntity entity) { + return entity.getReportingTaskTypes(); + } + + @Override + protected void mergeResponses(Set<DocumentedTypeDTO> clientDto, Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap, NodeIdentifier selectedNodeId) { + DocumentedTypesMerger.mergeDocumentedTypes(clientDto, dtoMap); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index c27f186..754d370 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -703,7 +703,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } if (includeDataFlow) { - request.setDataFlow(new StandardDataFlow(flowService.createDataFlow())); + request.setDataFlow(new StandardDataFlow(flowService.createDataFlowFromController())); } request.setNodeConnectionStatuses(getConnectionStatuses()); @@ -889,7 +889,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl DataFlow dataFlow = null; if (flowService != null) { try { - dataFlow = flowService.createDataFlow(); + dataFlow = flowService.createDataFlowFromController(); } catch (final IOException ioe) { logger.error("Unable to obtain current dataflow from FlowService in order to provide the flow to " + resolvedNodeIdentifier + ". Will tell node to try again later", ioe); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/DocumentedTypesMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/DocumentedTypesMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/DocumentedTypesMerger.java new file mode 100644 index 0000000..91ed849 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/DocumentedTypesMerger.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.DocumentedTypeDTO; + +import java.util.Map; +import java.util.Set; + +public final class DocumentedTypesMerger { + + private DocumentedTypesMerger() {} + + /** + * Merges the documented types from all nodes in the cluster. + * + * @param clientDocumentedTypes the selected documented types + * @param dtoMap the documented types from all nodes + */ + public static void mergeDocumentedTypes(final Set<DocumentedTypeDTO> clientDocumentedTypes, final Map<NodeIdentifier, Set<DocumentedTypeDTO>> dtoMap) { + dtoMap.forEach((nodeIdentifier, nodeDocumentedTypes) -> { + clientDocumentedTypes.retainAll(nodeDocumentedTypes); + }); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java index 2553828..5c419e9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java @@ -80,10 +80,19 @@ public class ProcessorEntityMerger implements ComponentEntityMerger<ProcessorEnt // merge the validation errors and aggregate the property descriptors, if authorized if (nodeProcessor != null) { final NodeIdentifier nodeId = nodeEntry.getKey(); + + // merge the validation errors ErrorMerger.mergeErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors()); + + // aggregate the property descriptors nodeProcessor.getConfig().getDescriptors().values().stream().forEach(propertyDescriptor -> { propertyDescriptorMap.computeIfAbsent(propertyDescriptor.getName(), nodeIdToPropertyDescriptor -> new HashMap<>()).put(nodeId, propertyDescriptor); }); + + // if any node does not support multiple versions (null or false), make it unavailable + if (clientDto.getMultipleVersionsAvailable() == null || !Boolean.TRUE.equals(nodeProcessor.getMultipleVersionsAvailable())) { + clientDto.setMultipleVersionsAvailable(Boolean.FALSE); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java index b7f9e82..6447e27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.HashSet; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -139,6 +140,6 @@ public class TestPopularVoteFlowElection { } private DataFlow createDataFlow(final byte[] flow) { - return new StandardDataFlow(flow, new byte[0], new byte[0]); + return new StandardDataFlow(flow, new byte[0], new byte[0], new HashSet<>()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index e2577a9..1378d3b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -88,7 +89,7 @@ public class TestNodeClusterCoordinator { }; final FlowService flowService = Mockito.mock(FlowService.class); - final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]); + final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>()); Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow); coordinator.setFlowService(flowService); } @@ -143,7 +144,7 @@ public class TestNodeClusterCoordinator { }; final NodeIdentifier requestedNodeId = createNodeId(6); - final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0])); + final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>())); final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage(); requestMsg.setConnectionRequest(request); @@ -184,8 +185,8 @@ public class TestNodeClusterCoordinator { }; final FlowService flowService = Mockito.mock(FlowService.class); - final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]); - Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow); + final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>()); + Mockito.when(flowService.createDataFlowFromController()).thenReturn(dataFlow); coordinator.setFlowService(flowService); coordinator.setConnected(true); @@ -408,7 +409,7 @@ public class TestNodeClusterCoordinator { final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false); final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "localhost", 10000, 11000, false); - final ConnectionRequest connectionRequest = new ConnectionRequest(id1, new StandardDataFlow(new byte[0], new byte[0], new byte[0])); + final ConnectionRequest connectionRequest = new ConnectionRequest(id1, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>())); final ConnectionRequestMessage crm = new ConnectionRequestMessage(); crm.setConnectionRequest(connectionRequest); @@ -419,7 +420,7 @@ public class TestNodeClusterCoordinator { final NodeIdentifier resolvedNodeId = responseMessage.getConnectionResponse().getNodeIdentifier(); assertEquals(id1, resolvedNodeId); - final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], new byte[0])); + final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>())); final ConnectionRequestMessage crm2 = new ConnectionRequestMessage(); crm2.setConnectionRequest(conRequest2); @@ -442,7 +443,7 @@ public class TestNodeClusterCoordinator { } private ProtocolMessage requestConnection(final NodeIdentifier requestedNodeId, final NodeClusterCoordinator coordinator) { - final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0])); + final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>())); final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage(); requestMsg.setConnectionRequest(request); return coordinator.handle(requestMsg); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java index fb80ab2..659ec78 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/NopStateProvider.java @@ -112,4 +112,5 @@ public class NopStateProvider implements StateProvider { public Scope[] getSupportedScopes() { return new Scope[] {Scope.LOCAL}; } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties index 44b2a4e..d824231 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/nifi.properties @@ -14,7 +14,6 @@ # limitations under the License. # Core Properties # -nifi.version=nifi-test 3.0.0 nifi.flow.configuration.file=./target/flow.xml.gz nifi.flow.configuration.archive.dir=./target/archive/ nifi.flowcontroller.autoResumeState=true http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java index 7c0c5ac..6e2b9fe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.protocol; +import java.util.Set; + public interface DataFlow { /** @@ -34,4 +36,9 @@ public interface DataFlow { */ public byte[] getAuthorizerFingerprint(); + /** + * @return the component ids of components that were created as a missing ghost component + */ + public Set<String> getMissingComponents(); + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 1053463..345ce64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -18,13 +18,15 @@ package org.apache.nifi.controller; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.InstanceClassLoader; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.registry.VariableRegistry; @@ -43,6 +45,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -50,7 +53,6 @@ import java.util.concurrent.locks.ReentrantLock; public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent { private final String id; - private final ConfigurableComponent component; private final ValidationContextFactory validationContextFactory; private final ControllerServiceProvider serviceProvider; private final AtomicReference<String> name; @@ -58,25 +60,24 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone private final String componentType; private final String componentCanonicalClass; private final VariableRegistry variableRegistry; - private final ComponentLog logger; + private final AtomicBoolean isExtensionMissing; private final Lock lock = new ReentrantLock(); private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>(); - public AbstractConfiguredComponent(final ConfigurableComponent component, final String id, + public AbstractConfiguredComponent(final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, - final ComponentLog logger) { + final boolean isExtensionMissing) { this.id = id; - this.component = component; this.validationContextFactory = validationContextFactory; this.serviceProvider = serviceProvider; - this.name = new AtomicReference<>(component.getClass().getSimpleName()); + this.name = new AtomicReference<>(componentType); this.componentType = componentType; this.componentCanonicalClass = componentCanonicalClass; this.variableRegistry = variableRegistry; - this.logger = logger; + this.isExtensionMissing = new AtomicBoolean(isExtensionMissing); } @Override @@ -85,6 +86,16 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } @Override + public void setExtensionMissing(boolean extensionMissing) { + this.isExtensionMissing.set(extensionMissing); + } + + @Override + public boolean isExtensionMissing() { + return isExtensionMissing.get(); + } + + @Override public String getName() { return name.get(); } @@ -114,11 +125,11 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone try { verifyModifiable(); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), id)) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), id)) { boolean classpathChanged = false; for (final Map.Entry<String, String> entry : properties.entrySet()) { // determine if any of the property changes require resetting the InstanceClassLoader - final PropertyDescriptor descriptor = component.getPropertyDescriptor(entry.getKey()); + final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(entry.getKey()); if (descriptor.isDynamicClasspathModifier()) { classpathChanged = true; } @@ -155,7 +166,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone throw new IllegalArgumentException("Name or Value can not be null"); } - final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); + final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(name); final String oldValue = properties.put(descriptor, value); if (!value.equals(oldValue)) { @@ -175,7 +186,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } try { - component.onPropertyModified(descriptor, oldValue, value); + getComponent().onPropertyModified(descriptor, oldValue, value); } catch (final Exception e) { // nothing really to do here... } @@ -197,7 +208,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone throw new IllegalArgumentException("Name can not be null"); } - final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); + final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(name); String value = null; if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) { @@ -211,9 +222,9 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } try { - component.onPropertyModified(descriptor, value, null); + getComponent().onPropertyModified(descriptor, value, null); } catch (final Exception e) { - logger.error(e.getMessage(), e); + getLogger().error(e.getMessage(), e); } return true; @@ -231,10 +242,10 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone try { final URL[] urls = ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true); - if (logger.isDebugEnabled()) { - logger.debug("Adding {} resources to the classpath for {}", new Object[] {urls.length, name}); + if (getLogger().isDebugEnabled()) { + getLogger().debug("Adding {} resources to the classpath for {}", new Object[] {urls.length, name}); for (URL url : urls) { - logger.debug(url.getFile()); + getLogger().debug(url.getFile()); } } @@ -243,8 +254,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone if (!(classLoader instanceof InstanceClassLoader)) { // Really shouldn't happen, but if we somehow got here and don't have an InstanceClassLoader then log a warning and move on final String classLoaderName = classLoader == null ? "null" : classLoader.getClass().getName(); - if (logger.isWarnEnabled()) { - logger.warn(String.format("Unable to modify the classpath for %s, expected InstanceClassLoader, but found %s", name, classLoaderName)); + if (getLogger().isWarnEnabled()) { + getLogger().warn(String.format("Unable to modify the classpath for %s, expected InstanceClassLoader, but found %s", name, classLoaderName)); } return; } @@ -253,14 +264,14 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone instanceClassLoader.setInstanceResources(urls); } catch (MalformedURLException e) { // Shouldn't get here since we are suppressing errors - logger.warn("Error processing classpath resources", e); + getLogger().warn("Error processing classpath resources", e); } } @Override public Map<PropertyDescriptor, String> getProperties() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { - final List<PropertyDescriptor> supported = component.getPropertyDescriptors(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + final List<PropertyDescriptor> supported = getComponent().getPropertyDescriptors(); if (supported == null || supported.isEmpty()) { return Collections.unmodifiableMap(properties); } else { @@ -303,36 +314,124 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone @Override public String toString() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { - return component.toString(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + return getComponent().toString(); } } @Override public Collection<ValidationResult> validate(final ValidationContext context) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { - return component.validate(context); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + final Collection<ValidationResult> validationResults = getComponent().validate(context); + + // validate selected controller services implement the API required by the processor + + final List<PropertyDescriptor> supportedDescriptors = getComponent().getPropertyDescriptors(); + if (null != supportedDescriptors) { + for (final PropertyDescriptor descriptor : supportedDescriptors) { + if (descriptor.getControllerServiceDefinition() == null) { + // skip properties that aren't for a controller service + continue; + } + + final String controllerServiceId = context.getProperty(descriptor).getValue(); + if (controllerServiceId == null) { + // if the property value is null we should already have a validation error + continue; + } + + final ControllerServiceNode controllerServiceNode = getControllerServiceProvider().getControllerServiceNode(controllerServiceId); + if (controllerServiceNode == null) { + // if the node was null we should already have a validation error + continue; + } + + final Class<? extends ControllerService> controllerServiceApiClass = descriptor.getControllerServiceDefinition(); + final ClassLoader controllerServiceApiClassLoader = controllerServiceApiClass.getClassLoader(); + + final Bundle controllerServiceApiBundle = ExtensionManager.getBundle(controllerServiceApiClassLoader); + final BundleCoordinate controllerServiceApiCoordinate = controllerServiceApiBundle.getBundleDetails().getCoordinate(); + + final Bundle controllerServiceBundle = ExtensionManager.getBundle(controllerServiceNode.getBundleCoordinate()); + final BundleCoordinate controllerServiceCoordinate = controllerServiceBundle.getBundleDetails().getCoordinate(); + + final boolean matchesApi = matchesApi(controllerServiceBundle, controllerServiceApiCoordinate); + + if (!matchesApi) { + final String controllerServiceType = controllerServiceNode.getComponentType(); + final String controllerServiceApiType = controllerServiceApiClass.getSimpleName(); + + final String explanation = new StringBuilder() + .append(controllerServiceType).append(" - ").append(controllerServiceCoordinate.getVersion()) + .append(" from ").append(controllerServiceCoordinate.getGroup()).append(" - ").append(controllerServiceCoordinate.getId()) + .append(" is not compatible with ").append(controllerServiceApiType).append(" - ").append(controllerServiceApiCoordinate.getVersion()) + .append(" from ").append(controllerServiceApiCoordinate.getGroup()).append(" - ").append(controllerServiceApiCoordinate.getId()) + .toString(); + + validationResults.add(new ValidationResult.Builder() + .input(controllerServiceId) + .subject(descriptor.getDisplayName()) + .valid(false) + .explanation(explanation) + .build()); + } + + } + } + + return validationResults; + } + } + + /** + * Determines if the given controller service node has the required API as an ancestor. + * + * @param controllerServiceImplBundle the bundle of a controller service being referenced by a processor + * @param requiredApiCoordinate the controller service API required by the processor + * @return true if the controller service node has the require API as an ancestor, false otherwise + */ + private boolean matchesApi(final Bundle controllerServiceImplBundle, final BundleCoordinate requiredApiCoordinate) { + // start with the coordinate of the controller service for cases where the API and service are in the same bundle + BundleCoordinate controllerServiceDependencyCoordinate = controllerServiceImplBundle.getBundleDetails().getCoordinate(); + + boolean foundApiDependency = false; + while (controllerServiceDependencyCoordinate != null) { + // determine if the dependency coordinate matches the required API + if (requiredApiCoordinate.equals(controllerServiceDependencyCoordinate)) { + foundApiDependency = true; + break; + } + + // move to the next dependency in the chain, or stop if null + final Bundle controllerServiceDependencyBundle = ExtensionManager.getBundle(controllerServiceDependencyCoordinate); + if (controllerServiceDependencyBundle == null) { + controllerServiceDependencyCoordinate = null; + } else { + controllerServiceDependencyCoordinate = controllerServiceDependencyBundle.getBundleDetails().getDependencyCoordinate(); + } } + + return foundApiDependency; } @Override public PropertyDescriptor getPropertyDescriptor(final String name) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { - return component.getPropertyDescriptor(name); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + return getComponent().getPropertyDescriptor(name); } } @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { - component.onPropertyModified(descriptor, oldValue, newValue); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + getComponent().onPropertyModified(descriptor, oldValue, newValue); } } @Override public List<PropertyDescriptor> getPropertyDescriptors() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { - return component.getPropertyDescriptors(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + return getComponent().getPropertyDescriptors(); } } @@ -363,8 +462,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { - validationResults = component.validate(validationContext); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + validationResults = getComponent().validate(validationContext); } for (final ValidationResult result : validationResults) { @@ -407,4 +506,19 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone return this.variableRegistry; } + @Override + public void verifyCanUpdateBundle(final BundleCoordinate incomingCoordinate) throws IllegalArgumentException { + final BundleCoordinate existingCoordinate = getBundleCoordinate(); + + // determine if this update is changing the bundle for the processor + if (!existingCoordinate.equals(incomingCoordinate)) { + // if it is changing the bundle, only allow it to change to a different version within same group and id + if (!existingCoordinate.getGroup().equals(incomingCoordinate.getGroup()) + || !existingCoordinate.getId().equals(incomingCoordinate.getId())) { + throw new IllegalArgumentException(String.format( + "Unable to update component %s from %s to %s because bundle group and id must be the same.", + getIdentifier(), existingCoordinate.getCoordinate(), incomingCoordinate.getCoordinate())); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java index a0a6060..1c0b7c1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java @@ -24,8 +24,11 @@ import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.ComponentAuthorizable; import org.apache.nifi.authorization.resource.RestrictedComponentsAuthorizable; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; import java.util.Collection; import java.util.Map; @@ -50,6 +53,18 @@ public interface ConfiguredComponent extends ComponentAuthorizable { boolean isValid(); + BundleCoordinate getBundleCoordinate(); + + ConfigurableComponent getComponent(); + + ComponentLog getLogger(); + + boolean isExtensionMissing(); + + void setExtensionMissing(boolean extensionMissing); + + void verifyCanUpdateBundle(BundleCoordinate bundleCoordinate) throws IllegalStateException; + /** * @return the any validation errors for this connectable */ http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/LoggableComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/LoggableComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/LoggableComponent.java new file mode 100644 index 0000000..6d03675 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/LoggableComponent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.logging.ComponentLog; + +/** + * Holder to pass around a ConfigurableComponent with its coordinate and logger. + * + * @param <T> the type of ConfigurableComponent + */ +public class LoggableComponent<T extends ConfigurableComponent> { + + private final T component; + + private final BundleCoordinate bundleCoordinate; + + private final ComponentLog logger; + + public LoggableComponent(final T component, final BundleCoordinate bundleCoordinate, final ComponentLog logger) { + this.component = component; + this.bundleCoordinate = bundleCoordinate; + this.logger = logger; + } + + public T getComponent() { + return component; + } + + public BundleCoordinate getBundleCoordinate() { + return bundleCoordinate; + } + + public ComponentLog getLogger() { + return logger; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 8f405e0..8c78958 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -16,40 +16,39 @@ */ package org.apache.nifi.controller; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.scheduling.ScheduleState; import org.apache.nifi.controller.scheduling.SchedulingAgent; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.scheduling.SchedulingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { private static final Logger logger = LoggerFactory.getLogger(ProcessorNode.class); protected final AtomicReference<ScheduledState> scheduledState; - public ProcessorNode(final Processor processor, final String id, + public ProcessorNode(final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, - final ComponentLog logger) { - super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, logger); + final boolean isExtensionMissing) { + super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, isExtensionMissing); this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED); } @@ -77,6 +76,8 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen public abstract Processor getProcessor(); + public abstract void setProcessor(LoggableComponent<Processor> processor); + public abstract void yield(long period, TimeUnit timeUnit); public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java index 11b6cb6..22cc3b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java @@ -47,6 +47,8 @@ public interface ReportingTaskNode extends ConfiguredComponent { ReportingTask getReportingTask(); + void setReportingTask(LoggableComponent<ReportingTask> reportingTask); + ReportingContext getReportingContext(); ConfigurationContext getConfigurationContext(); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java index c4aba44..c979745 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java @@ -24,4 +24,7 @@ public class ProcessorInstantiationException extends Exception { super(className, t); } + public ProcessorInstantiationException(final String className) { + super(className); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java index ee0bf50..b48f198 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java @@ -17,6 +17,8 @@ package org.apache.nifi.controller.reporting; import java.util.Set; + +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.controller.ReportingTaskNode; /** @@ -31,6 +33,7 @@ public interface ReportingTaskProvider { * @param type the type (fully qualified class name) of the reporting task * to instantiate * @param id the identifier for the Reporting Task + * @param bundleCoordinate the bundle coordinate for the type of reporting task * @param firstTimeAdded whether or not this is the first time that the * reporting task is being added to the flow. I.e., this will be true only * when the user adds the reporting task to the flow, not when the flow is @@ -41,7 +44,7 @@ public interface ReportingTaskProvider { * @throws ReportingTaskInstantiationException if unable to create the * Reporting Task */ - ReportingTaskNode createReportingTask(String type, String id, boolean firstTimeAdded) throws ReportingTaskInstantiationException; + ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded) throws ReportingTaskInstantiationException; /** * @param identifier of node @@ -108,4 +111,15 @@ public interface ReportingTaskProvider { * STOPPED, or if the Reporting Task has active threads */ void disableReportingTask(ReportingTaskNode reportingTask); + + /** + * Changes the underlying ReportingTask held by the node to an instance of the new type. + * + * @param reportingTask the ReportingTaskNode being updated + * @param newType the fully qualified class name of the new type + * @param bundleCoordinate the bundle coordinate of the new type + * @throws ReportingTaskInstantiationException if unable to create an instance of the new type + */ + void changeReportingTaskType(final ReportingTaskNode reportingTask, final String newType, final BundleCoordinate bundleCoordinate) throws ReportingTaskInstantiationException; + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceInvocationHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceInvocationHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceInvocationHandler.java new file mode 100644 index 0000000..0d03e98 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceInvocationHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service; + +import java.lang.reflect.InvocationHandler; + +public interface ControllerServiceInvocationHandler extends InvocationHandler { + + /** + * Allows changing the underlying node that is used by this invocation handler to check the state of the service. + * + * @param serviceNode the node to be used by this invocation handler + */ + void setServiceNode(final ControllerServiceNode serviceNode); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index c0ff480..8fe4eb2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.service; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.groups.ProcessGroup; import java.util.List; @@ -52,6 +53,11 @@ public interface ControllerServiceNode extends ConfiguredComponent { ControllerService getProxiedControllerService(); /** + * @return the invocation handler being used by the proxy + */ + ControllerServiceInvocationHandler getInvocationHandler(); + + /** * Returns the list of services that are required to be enabled before this * service is enabled. The returned list is flattened and contains both * immediate and transient dependencies. @@ -167,4 +173,16 @@ public interface ControllerServiceNode extends ConfiguredComponent { * {@link #disable(ScheduledExecutorService)}. */ boolean isActive(); + + /** + * Sets a new proxy and implementation for this node. + * + * @param implementation the actual implementation controller service + * @param proxiedControllerService the proxied controller service + * @param invocationHandler the invocation handler being used by the proxy + */ + void setControllerServiceAndProxy(final LoggableComponent<ControllerService> implementation, + final LoggableComponent<ControllerService> proxiedControllerService, + final ControllerServiceInvocationHandler invocationHandler); + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index 51d54a0..d1111e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Set; import org.apache.nifi.annotation.lifecycle.OnAdded; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; @@ -36,10 +37,11 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { * * @param type of service * @param id of service + * @param bundleCoordinate the coordinate of the bundle for the service * @param firstTimeAdded for service * @return the service node */ - ControllerServiceNode createControllerService(String type, String id, boolean firstTimeAdded); + ControllerServiceNode createControllerService(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded); /** * @param id of the service http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java index 53dbb01..4f5d284 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/services/FlowService.java @@ -103,11 +103,19 @@ public interface FlowService extends LifeCycle { void copyCurrentFlow(OutputStream os) throws IOException; /** - * Creates a DataFlow object from the current flow + * Creates a DataFlow object by first looking for a flow on from disk, and falling back to the controller's flow otherwise. * * @return the created DataFlow object * * @throws IOException if unable to read the flow from disk */ DataFlow createDataFlow() throws IOException; + + /** + * Creates a DataFlow object by serializing the flow controller's flow. + * + * @return the created DataFlow object. + */ + DataFlow createDataFlowFromController() throws IOException; + }
