http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java index 1dc74ab..23f723e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java @@ -36,6 +36,7 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.NiFiProperties; @@ -64,6 +65,7 @@ public class TestStandardReportingContext { private Bundle systemBundle; private BulletinRepository bulletinRepo; private VariableRegistry variableRegistry; + private FlowRegistryClient flowRegistry; private volatile String propsFile = TestStandardReportingContext.class.getResource("/flowcontrollertest.nifi.properties").getFile(); @Before @@ -120,9 +122,10 @@ public class TestStandardReportingContext { authorizer = new MockPolicyBasedAuthorizer(groups1, users1, policies1); variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()); + flowRegistry = Mockito.mock(FlowRegistryClient.class); bulletinRepo = Mockito.mock(BulletinRepository.class); - controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry); + controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry, flowRegistry); } @After
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index b55e98d..1b54c64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -46,6 +46,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.provenance.MockProvenanceRepository; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.After; @@ -714,7 +715,8 @@ public class TestProcessorLifecycle { final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties, mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(), - new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths())); + new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()), + mock(FlowRegistryClient.class)); return new FlowControllerAndSystemBundle(flowController, systemBundle); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java index 7a49103..8044ede 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java @@ -29,6 +29,7 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.NiFiProperties; @@ -80,7 +81,8 @@ public class StandardFlowSerializerTest { final VariableRegistry variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()); final BulletinRepository bulletinRepo = Mockito.mock(BulletinRepository.class); - controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry); + controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, + auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class)); serializer = new StandardFlowSerializer(encryptor); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index a28eb34..27e1678 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -45,6 +46,9 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; @@ -53,6 +57,7 @@ public class MockProcessGroup implements ProcessGroup { private final Map<String, ProcessorNode> processorMap = new HashMap<>(); private final FlowController flowController; private final MutableVariableRegistry variableRegistry = new MutableVariableRegistry(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY); + private VersionControlInformation versionControlInfo; public MockProcessGroup(final FlowController flowController) { this.flowController = flowController; @@ -625,4 +630,35 @@ public class MockProcessGroup implements ProcessGroup { public Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName) { return Collections.emptySet(); } + + @Override + public Optional<String> getVersionedComponentId() { + return Optional.empty(); + } + + @Override + public void setVersionedComponentId(String versionedComponentId) { + } + + @Override + public VersionControlInformation getVersionControlInformation() { + return versionControlInfo; + } + + @Override + public void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) { + } + + @Override + public void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry) { + } + + @Override + public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty) { + } + + @Override + public void setVersionControlInformation(VersionControlInformation versionControlInformation, Map<String, String> versionedComponentIds) { + this.versionControlInfo = versionControlInformation; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index faa8c0e..84e582c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,6 +16,14 @@ */ package org.apache.nifi.web; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + import org.apache.nifi.authorization.AuthorizeAccess; import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.user.NiFiUser; @@ -23,6 +31,11 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.UnknownResourceException; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.web.api.dto.AccessPolicyDTO; import org.apache.nifi.web.api.dto.AffectedComponentDTO; import org.apache.nifi.web.api.dto.BulletinBoardDTO; @@ -59,6 +72,7 @@ import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.UserDTO; import org.apache.nifi.web.api.dto.UserGroupDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.action.HistoryDTO; import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; @@ -101,13 +115,9 @@ import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; import org.apache.nifi.web.api.entity.VariableRegistryEntity; - -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; +import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.apache.nifi.web.api.entity.VersionedFlowEntity; /** * Defines the NiFiServiceFacade interface. @@ -491,6 +501,14 @@ public interface NiFiServiceFacade { ProcessorEntity getProcessor(String id); /** + * Gets the Processor transfer object for the specified id, as it is visible to the given user + * + * @param id Id of the processor to return + * @return The Processor transfer object + */ + ProcessorEntity getProcessor(String id, NiFiUser user); + + /** * Gets the processor status. * * @param id id @@ -1066,6 +1084,16 @@ public interface NiFiServiceFacade { RemoteProcessGroupEntity getRemoteProcessGroup(String remoteProcessGroupId); /** + * Gets a remote process group as it is visible to the given user + * + * @param remoteProcessGroupId The id of the remote process group + * @param user the user requesting the action + * @return group + */ + RemoteProcessGroupEntity getRemoteProcessGroup(String remoteProcessGroupId, NiFiUser user); + + + /** * Gets all remote process groups in the a given parent group. * * @param groupId The id of the parent group @@ -1074,6 +1102,15 @@ public interface NiFiServiceFacade { Set<RemoteProcessGroupEntity> getRemoteProcessGroups(String groupId); /** + * Gets all remote process groups in the a given parent group as they are visible to the given user + * + * @param groupId The id of the parent group + * @param user the user making the request + * @return group + */ + Set<RemoteProcessGroupEntity> getRemoteProcessGroups(String groupId, NiFiUser user); + + /** * Gets the remote process group status. * * @param id remote process group @@ -1220,6 +1257,132 @@ public interface NiFiServiceFacade { */ FunnelEntity deleteFunnel(Revision revision, String funnelId); + + // ---------------------------------------- + // Version Control methods + // ---------------------------------------- + + /** + * Returns the Version Control information for the Process Group with the given ID + * + * @param processGroupId the ID of the Process Group + * @return the Version Control information that corresponds to the given Process Group, or <code>null</code> if the + * process group is not under version control + */ + VersionControlInformationEntity getVersionControlInformation(String processGroupId); + + + /** + * Adds the given Versioned Flow to the registry specified by the given ID + * + * @param registryId the ID of the registry + * @param flow the flow to add to the registry + * @return a VersionedFlow that is fully populated, including identifiers + * + * @throws IOException if unable to communicate with the Flow Registry + */ + VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, UnknownResourceException; + + /** + * Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry + * and adds the snapshot of the Process Group as the first version of that flow. + * + * @param groupId the UUID of the Process Group + * @param requestEntity the details of the flow to create + * @return a VersionControlComponentMappingEntity that contains the information needed to notify a Process Group where it is tracking to and map + * component ID's to their Versioned Component ID's + */ + VersionControlComponentMappingEntity registerFlowWithFlowRegistry(String groupId, VersionedFlowEntity requestEntity); + + /** + * Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id + * + * @param registryId the ID of the Flow Registry to persist the snapshot to + * @param flow the flow where the snapshot should be persisted + * @param snapshot the Snapshot to persist + * @param comments about the snapshot + * @return the snapshot that represents what was stored in the registry + * + * @throws IOException if unable to communicate with the Flow Registry + */ + VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException; + + /** + * Updates the Version Control Information on the Process Group with the given ID + * + * @param processGroupRevision the Revision of the Process Group + * @param processGroupId the ID of the process group to update + * @param versionControlInfo the new Version Control Information + * @param versionedComponentMapping a mapping of component ID to Versioned Component ID + * + * @return a VersionControlInformationEntity that represents the newly updated Version Control information + */ + VersionControlInformationEntity setVersionControlInformation(Revision processGroupRevision, String processGroupId, VersionControlInformationDTO versionControlInfo, + Map<String, String> versionedComponentMapping); + + + /** + * Retrieves the Versioned Flow Snapshot for the coordinates provided by the given Version Control Information DTO + * + * @param versionControlInfo the coordinates of the versioned flow + * @return the VersionedFlowSnapshot that corresponds to the given coordinates + * + * @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found + */ + VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException; + + /** + * Determines which components currently exist in the Process Group with the given identifier and calculates which of those components + * would be impacted by updating the Process Group to the provided snapshot + * + * @param processGroupId the ID of the Process Group to update + * @param updatedSnapshot the snapshot to update the Process Group to + * @param user the user making the request + * @return the set of all components that would be affected by updating the Process Group + */ + Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user) throws IOException; + + /** + * Verifies that the Process Group with the given identifier can be updated to the proposed flow + * + * @param groupId the ID of the Process Group to update + * @param proposedFlow the proposed flow + * @param verifyConnectionRemoval whether or not to verify that connections that no longer exist in the proposed flow are eligible for deletion + * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>, + * and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will + * throw an IllegalStateException + */ + void verifyCanUpdate(String groupId, VersionedFlowSnapshot proposedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty); + + /** + * Updates the Process group with the given ID to match the new snapshot + * + * @param revision the revision of the Process Group + * @param groupId the ID of the Process Group + * @param versionControlInfo the Version Control information + * @param snapshot the new snapshot + * @param componentIdSeed the seed to use for generating new component ID's + * @return the Process Group + */ + ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, + boolean verifyNotModified); + + /** + * Updates the Process group with the given ID to match the new snapshot + * + * @param user the user performing the request + * @param revision the revision of the Process Group + * @param groupId the ID of the Process Group + * @param versionControlInfo the Version Control information + * @param snapshot the new snapshot + * @param componentIdSeed the seed to use for generating new component ID's + * @return the Process Group + */ + ProcessGroupEntity updateProcessGroup(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, + boolean verifyNotModified); + + void setFlowRegistryClient(FlowRegistryClient flowRegistryClient); + // ---------------------------------------- // Component state methods // ---------------------------------------- @@ -1290,6 +1453,7 @@ public interface NiFiServiceFacade { */ void clearReportingTaskState(String reportingTaskId); + // ---------------------------------------- // Label methods // ---------------------------------------- @@ -1510,6 +1674,15 @@ public interface NiFiServiceFacade { ControllerServiceEntity getControllerService(String controllerServiceId); /** + * Gets the specified controller service as it is visible to the given user + * + * @param controllerServiceId id + * @param user the user making the request + * @return service + */ + ControllerServiceEntity getControllerService(String controllerServiceId, NiFiUser user); + + /** * Get the descriptor for the specified property of the specified controller service. * * @param id id http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java index d0230db..f7f3b90 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java @@ -138,6 +138,12 @@ public class NiFiServiceFacadeLock { return proceedWithReadLock(proceedingJoinPoint); } + @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && " + + "execution(* register*(..))") + public Object registerLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + return proceedWithReadLock(proceedingJoinPoint); + } + private Object proceedWithReadLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable { final long beforeLock = System.nanoTime(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 9caabd6..d3a5fd0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,32 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; import org.apache.nifi.action.FlowChangeAction; @@ -85,6 +110,31 @@ import org.apache.nifi.history.History; import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.history.PreviousValue; import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.flow.ConnectableComponent; +import org.apache.nifi.registry.flow.FlowRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.RemoteFlowCoordinates; +import org.apache.nifi.registry.flow.UnknownResourceException; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedComponent; +import org.apache.nifi.registry.flow.VersionedConnection; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.diff.ComparableDataFlow; +import org.apache.nifi.registry.flow.diff.DifferenceType; +import org.apache.nifi.registry.flow.diff.FlowComparator; +import org.apache.nifi.registry.flow.diff.FlowComparison; +import org.apache.nifi.registry.flow.diff.FlowDifference; +import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; +import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.flow.mapping.InstantiatedConnectableComponent; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; @@ -140,6 +190,8 @@ import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.UserDTO; import org.apache.nifi.web.api.dto.UserGroupDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.dto.VersionedFlowDTO; import org.apache.nifi.web.api.dto.action.HistoryDTO; import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; @@ -197,6 +249,9 @@ import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.api.entity.VariableRegistryEntity; +import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.AccessPolicyDAO; import org.apache.nifi.web.dao.ConnectionDAO; @@ -224,29 +279,7 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; +import com.google.common.collect.Sets; /** * Implementation of NiFiServiceFacade that performs revision checking. @@ -285,6 +318,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // administrative services private AuditService auditService; + // flow registry + private FlowRegistryClient flowRegistryClient; + // properties private NiFiProperties properties; private DtoFactory dtoFactory; @@ -925,7 +961,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public RevisionUpdate<ScheduleComponentsEntity> update() { // schedule the components - processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); + processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); // update the revisions final Map<String, Revision> updatedRevisions = new HashMap<>(); @@ -950,7 +986,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); return activateControllerServices(user, processGroupId, state, serviceRevisions); } @@ -1010,6 +1045,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions); } + @Override public NodeDTO updateNode(final NodeDTO nodeDTO) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); @@ -2512,9 +2548,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entityFactory.createStatusHistoryEntity(dto, permissions); } - private ProcessorEntity createProcessorEntity(final ProcessorNode processor) { + private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user); final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier())); final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -2524,8 +2560,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public Set<ProcessorEntity> getProcessors(final String groupId, final boolean includeDescendants) { final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId, includeDescendants); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); return processors.stream() - .map(processor -> createProcessorEntity(processor)) + .map(processor -> createProcessorEntity(processor, user)) .collect(Collectors.toSet()); } @@ -2582,8 +2619,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessorEntity getProcessor(final String id) { + return getProcessor(id, NiFiUserUtils.getNiFiUser()); + } + + @Override + public ProcessorEntity getProcessor(final String id, final NiFiUser user) { final ProcessorNode processor = processorDAO.getProcessor(id); - return createProcessorEntity(processor); + return createProcessorEntity(processor, user); } @Override @@ -3103,9 +3145,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { .collect(Collectors.toSet()); } - private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg) { + private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg, final NiFiUser user) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg, user); final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier())); final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -3114,9 +3156,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId) { + return getRemoteProcessGroups(groupId, NiFiUserUtils.getNiFiUser()); + } + + @Override + public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId, final NiFiUser user) { final Set<RemoteProcessGroup> rpgs = remoteProcessGroupDAO.getRemoteProcessGroups(groupId); return rpgs.stream() - .map(rpg -> createRemoteGroupEntity(rpg)) + .map(rpg -> createRemoteGroupEntity(rpg, user)) .collect(Collectors.toSet()); } @@ -3150,8 +3197,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) { + return getRemoteProcessGroup(remoteProcessGroupId, NiFiUserUtils.getNiFiUser()); + } + + @Override + public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId, final NiFiUser user) { final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); - return createRemoteGroupEntity(rpg); + return createRemoteGroupEntity(rpg, user); } @Override @@ -3307,8 +3359,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ControllerServiceEntity getControllerService(final String controllerServiceId) { + return getControllerService(controllerServiceId, NiFiUserUtils.getNiFiUser()); + } + + @Override + public ControllerServiceEntity getControllerService(final String controllerServiceId, final NiFiUser user) { final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), NiFiUserUtils.getNiFiUser()); + return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), user); } @Override @@ -3375,6 +3432,415 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entityFactory.createStatusHistoryEntity(dto, permissions); } + @Override + public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final VersionedFlowEntity requestEntity) { + // Create a VersionedProcessGroup snapshot of the flow as it is currently. + final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId); + + final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow(); + final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId(); + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setBucketIdentifier(versionedFlowDto.getBucketId()); + versionedFlow.setCreatedTimestamp(System.currentTimeMillis()); + versionedFlow.setDescription(versionedFlowDto.getDescription()); + versionedFlow.setModifiedTimestamp(versionedFlow.getCreatedTimestamp()); + versionedFlow.setName(versionedFlowDto.getFlowName()); + versionedFlow.setIdentifier(flowId); + + // Add the Versioned Flow and first snapshot to the Flow Registry + final String registryId = requestEntity.getVersionedFlow().getRegistryId(); + final VersionedFlowSnapshot registeredSnapshot; + final VersionedFlow registeredFlow; + + String action = "create the flow"; + try { + // first, create the flow in the registry, if necessary + if (versionedFlowDto.getFlowId() == null) { + registeredFlow = registerVersionedFlow(registryId, versionedFlow); + } else { + registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId()); + } + + action = "add the local flow to the Flow Registry as the first Snapshot"; + + // add first snapshot to the flow in the registry + final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription(); + registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments); + } catch (final UnknownResourceException e) { + throw new IllegalArgumentException(e); + } catch (final IOException ioe) { + // will result in a 500: Internal Server Error + throw new RuntimeException("Failed to communicate with Flow Registry when attempting to " + action); + } + + // Update the Process Group with the new VersionControlInformation. (Send this to all nodes). + final VersionControlInformationDTO vci = new VersionControlInformationDTO(); + vci.setBucketId(registeredFlow.getBucketIdentifier()); + vci.setCurrent(true); + vci.setFlowId(registeredFlow.getIdentifier()); + vci.setGroupId(groupId); + vci.setModified(false); + vci.setRegistryId(registryId); + vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion()); + + final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup); + + final Revision groupRevision = revisionManager.getRevision(groupId); + final RevisionDTO groupRevisionDto = dtoFactory.createRevisionDTO(groupRevision); + + final VersionControlComponentMappingEntity entity = new VersionControlComponentMappingEntity(); + entity.setVersionControlInformation(vci); + entity.setProcessGroupRevision(groupRevisionDto); + entity.setVersionControlComponentMapping(mapping); + return entity; + } + + @Override + public VersionControlInformationEntity getVersionControlInformation(final String groupId) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); + if (versionControlInfo == null) { + return null; + } + + final VersionControlInformationDTO versionControlDto = dtoFactory.createVersionControlInformationDto(versionControlInfo); + final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(groupId)); + return entityFactory.createVersionControlInformationEntity(versionControlDto, groupRevision); + } + + private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient); + return versionedGroup; + } + + @Override + public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, UnknownResourceException { + final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); + if (registry == null) { + throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); + } + + return registry.registerVersionedFlow(flow); + } + + private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, UnknownResourceException { + final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); + if (registry == null) { + throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); + } + + return registry.getVersionedFlow(bucketId, flowId); + } + + @Override + public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow, + final VersionedProcessGroup snapshot, final String comments) throws IOException, UnknownResourceException { + final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); + if (registry == null) { + throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); + } + + return registry.registerVersionedFlowSnapshot(flow, snapshot, comments); + } + + @Override + public VersionControlInformationEntity setVersionControlInformation(final Revision revision, final String processGroupId, + final VersionControlInformationDTO versionControlInfo, final Map<String, String> versionedComponentMapping) { + + final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); + + final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision, + group, + () -> processGroupDAO.updateVersionControlInformation(versionControlInfo, versionedComponentMapping), + processGroup -> dtoFactory.createVersionControlInformationDto(processGroup.getVersionControlInformation())); + + return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification())); + } + + @Override + public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty); + } + + @Override + public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException { + final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); + + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient); + + final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents); + final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", updatedSnapshot.getFlowContents()); + + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow); + final FlowComparison comparison = flowComparator.compare(); + + final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream() + .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow. + .map(difference -> { + final VersionedComponent localComponent = difference.getComponentA(); + + final String state; + switch (localComponent.getComponentType()) { + case CONTROLLER_SERVICE: + final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId(); + state = controllerServiceDAO.getControllerService(serviceId).getState().name(); + break; + case PROCESSOR: + final String processorId = ((InstantiatedVersionedProcessor) localComponent).getInstanceId(); + state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name(); + break; + case REMOTE_INPUT_PORT: + final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent; + state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name(); + break; + case REMOTE_OUTPUT_PORT: + final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent; + state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name(); + break; + default: + state = null; + break; + } + + return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state, user); + }) + .collect(Collectors.toCollection(HashSet::new)); + + for (final FlowDifference difference : comparison.getDifferences()) { + VersionedComponent component = difference.getComponentA(); + if (component == null) { + component = difference.getComponentB(); + } + + if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) { + final VersionedConnection connection = (VersionedConnection) component; + + final ConnectableComponent source = connection.getSource(); + final ConnectableComponent destination = connection.getDestination(); + + affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) source, user)); + affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) destination, user)); + } + } + + return affectedComponents; + } + + private String getComponentState(final InstantiatedConnectableComponent localComponent) { + final String componentId = localComponent.getInstanceId(); + final String groupId = localComponent.getInstanceGroupId(); + + switch (localComponent.getType()) { + case PROCESSOR: + return processorDAO.getProcessor(componentId).getPhysicalScheduledState().name(); + case REMOTE_INPUT_PORT: + return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getInputPort(componentId).getScheduledState().name(); + case REMOTE_OUTPUT_PORT: + return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getOutputPort(componentId).getScheduledState().name(); + default: + return null; + } + } + + private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) { + final AffectedComponentEntity entity = new AffectedComponentEntity(); + entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId()))); + entity.setId(instance.getInstanceId()); + + final Authorizable authorizable = getAuthorizable(componentTypeName, instance); + final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user); + entity.setPermissions(permissionsDto); + + final AffectedComponentDTO dto = new AffectedComponentDTO(); + dto.setId(instance.getInstanceId()); + dto.setReferenceType(componentTypeName); + dto.setProcessGroupId(instance.getInstanceGroupId()); + dto.setState(componentState); + + entity.setComponent(dto); + return entity; + } + + private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedConnectableComponent instance, final NiFiUser user) { + final AffectedComponentEntity entity = new AffectedComponentEntity(); + entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId()))); + entity.setId(instance.getInstanceId()); + + final String componentTypeName = instance.getType().name(); + final Authorizable authorizable = getAuthorizable(componentTypeName, instance); + final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user); + entity.setPermissions(permissionsDto); + + final AffectedComponentDTO dto = new AffectedComponentDTO(); + dto.setId(instance.getInstanceId()); + dto.setReferenceType(componentTypeName); + dto.setProcessGroupId(instance.getInstanceGroupId()); + dto.setState(getComponentState(instance)); + + entity.setComponent(dto); + return entity; + } + + private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) { + final String componentId = versionedComponent.getInstanceId(); + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE.name())) { + return authorizableLookup.getControllerService(componentId).getAuthorizable(); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONNECTION.name())) { + return authorizableLookup.getConnection(componentId).getAuthorizable(); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.FUNNEL.name())) { + return authorizableLookup.getFunnel(componentId); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.INPUT_PORT.name())) { + return authorizableLookup.getInputPort(componentId); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.OUTPUT_PORT.name())) { + return authorizableLookup.getOutputPort(componentId); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.LABEL.name())) { + return authorizableLookup.getLabel(componentId); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP.name())) { + return authorizableLookup.getProcessGroup(componentId).getAuthorizable(); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESSOR.name())) { + return authorizableLookup.getProcessor(componentId).getAuthorizable(); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_INPUT_PORT.name())) { + return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId()); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_OUTPUT_PORT.name())) { + return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId()); + } + + if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) { + return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId()); + } + + return null; + } + + @Override + public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo) throws IOException { + final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId()); + if (flowRegistry == null) { + throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId()); + } + + final VersionedFlowSnapshot snapshot; + try { + snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion()); + } catch (final UnknownResourceException e) { + throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket " + + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion()); + } + + // If this Flow has a reference to a remote flow, we need to pull that remote flow as well + populateVersionedChildFlows(snapshot); + + return snapshot; + } + + private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException { + final VersionedProcessGroup group = snapshot.getFlowContents(); + + for (final VersionedProcessGroup child : group.getProcessGroups()) { + populateVersionedFlows(child); + } + } + + private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException { + final RemoteFlowCoordinates remoteCoordinates = group.getRemoteFlowCoordinates(); + + if (remoteCoordinates != null) { + final String registryUrl = remoteCoordinates.getRegistryUrl(); + final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl); + if (registryId == null) { + throw new IllegalArgumentException("Process Group with ID " + group.getIdentifier() + " is under Version Control, referencing a Flow Registry at URL [" + registryUrl + + "], but no Flow Registry is currently registered for that URL."); + } + + final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); + + final VersionedFlowSnapshot childSnapshot; + try { + childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion()); + } catch (final UnknownResourceException e) { + throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket " + + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion()); + } + + final VersionedProcessGroup fetchedGroup = childSnapshot.getFlowContents(); + group.setComments(fetchedGroup.getComments()); + group.setPosition(fetchedGroup.getPosition()); + group.setName(fetchedGroup.getName()); + group.setVariables(fetchedGroup.getVariables()); + + group.setConnections(new LinkedHashSet<>(fetchedGroup.getConnections())); + group.setControllerServices(new LinkedHashSet<>(fetchedGroup.getControllerServices())); + group.setFunnels(new LinkedHashSet<>(fetchedGroup.getFunnels())); + group.setInputPorts(new LinkedHashSet<>(fetchedGroup.getInputPorts())); + group.setLabels(new LinkedHashSet<>(fetchedGroup.getLabels())); + group.setOutputPorts(new LinkedHashSet<>(fetchedGroup.getOutputPorts())); + group.setProcessGroups(new LinkedHashSet<>(fetchedGroup.getProcessGroups())); + group.setProcessors(new LinkedHashSet<>(fetchedGroup.getProcessors())); + group.setRemoteProcessGroups(new LinkedHashSet<>(fetchedGroup.getRemoteProcessGroups())); + } + + for (final VersionedProcessGroup child : group.getProcessGroups()) { + populateVersionedFlows(child); + } + } + + + @Override + public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, + final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) { + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + return updateProcessGroup(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified); + } + + @Override + public ProcessGroupEntity updateProcessGroup(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, + final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) { + + final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId); + final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision, + processGroupNode, + () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified), + processGroup -> dtoFactory.createProcessGroupDto(processGroup)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier())); + final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier())); + final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); + return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities); + } + + + @Override + public void setFlowRegistryClient(final FlowRegistryClient client) { + this.flowRegistryClient = client; + } + private AuthorizationResult authorizeAction(final Action action) { final String sourceId = action.getSourceId(); final Component type = action.getSourceType(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 7118e01..531823a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -16,40 +16,8 @@ */ package org.apache.nifi.web.api; -import static javax.ws.rs.core.Response.Status.NOT_FOUND; -import static org.apache.commons.lang3.StringUtils.isEmpty; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; - import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.core.CacheControl; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.ResponseBuilder; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriBuilderException; -import javax.ws.rs.core.UriInfo; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.AuthorizeAccess; @@ -69,6 +37,7 @@ import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.framework.security.util.SslContextFactory; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.exception.BadRequestException; @@ -77,6 +46,7 @@ import org.apache.nifi.remote.exception.NotAuthorizedException; import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.http.HttpHeaders; import org.apache.nifi.util.ComponentIdGenerator; +import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; @@ -87,9 +57,45 @@ import org.apache.nifi.web.api.entity.TransactionResultEntity; import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.apache.nifi.web.security.util.CacheKey; import org.apache.nifi.web.util.WebUtils; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.client.Client; +import javax.ws.rs.core.CacheControl; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriBuilderException; +import javax.ws.rs.core.UriInfo; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; + /** * Base class for controllers. */ @@ -221,6 +227,16 @@ public abstract class ApplicationResource { return Optional.of(idGenerationSeed); } + protected Client createJerseyClient() { + final NiFiProperties properties = getProperties(); + final ClientConfig clientConfig = new ClientConfig(); + final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS); + final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS); + clientConfig.property(ClientProperties.READ_TIMEOUT, readTimeout); + clientConfig.property(ClientProperties.CONNECT_TIMEOUT, connectionTimeout); + clientConfig.property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE); + return WebUtils.createClient(clientConfig, SslContextFactory.createSslContext(properties)); + } /** * Generates an Ok response with no content. http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index c9aea4f..38e8891 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -595,6 +595,24 @@ public class FlowResource extends ApplicationResource { componentIds.add(outputPort.getIdentifier()); }); + // ensure authorized for each remote input port we will attempt to schedule + group.findAllRemoteProcessGroups().stream() + .flatMap(rpg -> rpg.getInputPorts().stream()) + .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) + .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) + .forEach(port -> { + componentIds.add(port.getIdentifier()); + }); + + // ensure authorized for each remote output port we will attempt to schedule + group.findAllRemoteProcessGroups().stream() + .flatMap(rpg -> rpg.getOutputPorts().stream()) + .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) + .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) + .forEach(port -> { + componentIds.add(port.getIdentifier()); + }); + return componentIds; }); http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index b866677..ebed0ad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -763,7 +763,6 @@ public class ProcessGroupResource extends ApplicationResource { private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors, final Collection<AffectedComponentDTO> affectedServices, final VariableRegistryUpdateRequest updateRequest, final VariableRegistryEntity requestEntity) throws InterruptedException, IOException { - final Pause pause = createPause(updateRequest); // stop processors @@ -805,8 +804,6 @@ public class ProcessGroupResource extends ApplicationResource { logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId); updateRequest.getStartProcessorsStep().setComplete(true); } - - updateRequest.setComplete(true); } /** @@ -1414,6 +1411,7 @@ public class ProcessGroupResource extends ApplicationResource { * @param <T> type of class * @return the response entity */ + @SuppressWarnings("unchecked") private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) { T entity = (T) nodeResponse.getUpdatedEntity(); if (entity == null) {
