http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java index da5880c..9b3ba94 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -37,6 +38,7 @@ import java.util.UUID; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.client.NiFiRegistryException; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -150,7 +152,7 @@ public class FileBasedFlowRegistry implements FlowRegistry { try { final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier); versionedFlows.add(versionedFlow); - } catch (UnknownResourceException e) { + } catch (NiFiRegistryException e) { continue; } } @@ -164,9 +166,38 @@ public class FileBasedFlowRegistry implements FlowRegistry { return buckets; } + @Override + public Bucket getBucket(String bucketId) throws IOException, NiFiRegistryException { + return getBucket(bucketId, null); + } + + @Override + public Bucket getBucket(String bucketId, NiFiUser user) throws IOException, NiFiRegistryException { + return getBuckets(user).stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null); + } + + @Override + public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { + final Bucket bucket = getBuckets(user).stream().filter(b -> bucketId.equals(b.getIdentifier())).findFirst().orElse(null); + if (bucket == null) { + return Collections.emptySet(); + } + + return bucket.getVersionedFlows(); + } + + @Override + public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { + final VersionedFlow flow = getFlows(bucketId, user).stream().filter(f -> flowId.equals(f.getIdentifier())).findFirst().orElse(null); + if (flow == null) { + return Collections.emptySet(); + } + + return flow.getSnapshotMetadata(); + } @Override - public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException { + public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException { Objects.requireNonNull(flow); Objects.requireNonNull(flow.getBucketIdentifier()); Objects.requireNonNull(flow.getName()); @@ -174,7 +205,7 @@ public class FileBasedFlowRegistry implements FlowRegistry { // Verify that bucket exists final File bucketDir = new File(directory, flow.getBucketIdentifier()); if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier()); + throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier()); } // Verify that there is no flow with the same name in that bucket @@ -213,8 +244,8 @@ public class FileBasedFlowRegistry implements FlowRegistry { } @Override - public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments) - throws IOException, UnknownResourceException { + public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) + throws IOException, NiFiRegistryException { Objects.requireNonNull(flow); Objects.requireNonNull(flow.getBucketIdentifier()); Objects.requireNonNull(flow.getName()); @@ -223,13 +254,13 @@ public class FileBasedFlowRegistry implements FlowRegistry { // Verify that the bucket exists final File bucketDir = new File(directory, flow.getBucketIdentifier()); if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier()); + throw new NiFiRegistryException("No bucket exists with ID " + flow.getBucketIdentifier()); } // Verify that the flow exists final File flowDir = new File(bucketDir, flow.getIdentifier()); if (!flowDir.exists()) { - throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier()); + throw new NiFiRegistryException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier()); } final File[] versionDirs = flowDir.listFiles(); @@ -291,17 +322,17 @@ public class FileBasedFlowRegistry implements FlowRegistry { } @Override - public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException { + public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException { // Verify that the bucket exists final File bucketDir = new File(directory, bucketId); if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + bucketId); + throw new NiFiRegistryException("No bucket exists with ID " + bucketId); } // Verify that the flow exists final File flowDir = new File(bucketDir, flowId); if (!flowDir.exists()) { - throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId); + throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId); } final File[] versionDirs = flowDir.listFiles(); @@ -329,22 +360,22 @@ public class FileBasedFlowRegistry implements FlowRegistry { } @Override - public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException { + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, NiFiRegistryException { // Verify that the bucket exists final File bucketDir = new File(directory, bucketId); if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + bucketId); + throw new NiFiRegistryException("No bucket exists with ID " + bucketId); } // Verify that the flow exists final File flowDir = new File(bucketDir, flowId); if (!flowDir.exists()) { - throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); + throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); } final File versionDir = new File(flowDir, String.valueOf(version)); if (!versionDir.exists()) { - throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version); + throw new NiFiRegistryException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version); } final File contentsFile = new File(versionDir, "flow.xml"); @@ -383,17 +414,17 @@ public class FileBasedFlowRegistry implements FlowRegistry { } @Override - public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException { + public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException { // Verify that the bucket exists final File bucketDir = new File(directory, bucketId); if (!bucketDir.exists()) { - throw new UnknownResourceException("No bucket exists with ID " + bucketId); + throw new NiFiRegistryException("No bucket exists with ID " + bucketId); } // Verify that the flow exists final File flowDir = new File(bucketDir, flowId); if (!flowDir.exists()) { - throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); + throw new NiFiRegistryException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); } final File flowPropsFile = new File(flowDir, "flow.properties");
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java new file mode 100644 index 0000000..26be69b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.client.BucketClient; +import org.apache.nifi.registry.client.FlowClient; +import org.apache.nifi.registry.client.FlowSnapshotClient; +import org.apache.nifi.registry.client.NiFiRegistryClient; +import org.apache.nifi.registry.client.NiFiRegistryClientConfig; +import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient; + +public class RestBasedFlowRegistry implements FlowRegistry { + + private final FlowRegistryClient flowRegistryClient; + private final String identifier; + private final SSLContext sslContext; + private volatile String description; + private volatile String url; + private volatile String name; + + private NiFiRegistryClient registryClient; + + public RestBasedFlowRegistry(final FlowRegistryClient flowRegistryClient, final String identifier, final String url, final SSLContext sslContext, final String name) { + this.flowRegistryClient = flowRegistryClient; + this.identifier = identifier; + this.url = url; + this.name = name; + this.sslContext = sslContext; + } + + private synchronized NiFiRegistryClient getRegistryClient() { + if (registryClient != null) { + return registryClient; + } + + final NiFiRegistryClientConfig config = new NiFiRegistryClientConfig.Builder() + .connectTimeout(30000) + .readTimeout(30000) + .sslContext(sslContext) + .baseUrl(url) + .build(); + + registryClient = new JerseyNiFiRegistryClient.Builder() + .config(config) + .build(); + + return registryClient; + } + + private synchronized void invalidateClient() { + this.registryClient = null; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public void setDescription(final String description) { + this.description = description; + } + + @Override + public String getURL() { + return url; + } + + @Override + public synchronized void setURL(final String url) { + this.url = url; + invalidateClient(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void setName(final String name) { + this.name = name; + } + + @Override + public Set<Bucket> getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException { + final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity()); + return new HashSet<>(bucketClient.getAll()); + } + + @Override + public Bucket getBucket(final String bucketId) throws IOException, NiFiRegistryException { + final BucketClient bucketClient = getRegistryClient().getBucketClient(); + return bucketClient.get(bucketId); + } + + @Override + public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { + final BucketClient bucketClient = getRegistryClient().getBucketClient(user.isAnonymous() ? null : user.getIdentity()); + return bucketClient.get(bucketId); + } + + + @Override + public Set<VersionedFlow> getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { + final FlowClient flowClient = getRegistryClient().getFlowClient(user.isAnonymous() ? null : user.getIdentity()); + return new HashSet<>(flowClient.getByBucket(bucketId)); + } + + @Override + public Set<VersionedFlowSnapshotMetadata> getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { + final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(user.isAnonymous() ? null : user.getIdentity()); + return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId)); + } + + @Override + public VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, NiFiRegistryException { + final FlowClient flowClient = getRegistryClient().getFlowClient(); + return flowClient.create(flow); + } + + @Override + public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) + throws IOException, NiFiRegistryException { + + final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(); + final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot(); + versionedFlowSnapshot.setFlowContents(snapshot); + + final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); + metadata.setBucketIdentifier(flow.getBucketIdentifier()); + metadata.setFlowIdentifier(flow.getIdentifier()); + metadata.setFlowName(flow.getName()); + metadata.setTimestamp(System.currentTimeMillis()); + metadata.setVersion(expectedVersion); + metadata.setComments(comments); + + versionedFlowSnapshot.setSnapshotMetadata(metadata); + return snapshotClient.create(versionedFlowSnapshot); + } + + @Override + public int getLatestVersion(final String bucketId, final String flowId) throws IOException, NiFiRegistryException { + return (int) getRegistryClient().getFlowClient().get(bucketId, flowId).getVersionCount(); + } + + @Override + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException { + final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(); + final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version); + + final VersionedProcessGroup contents = flowSnapshot.getFlowContents(); + for (final VersionedProcessGroup child : contents.getProcessGroups()) { + populateVersionedContentsRecursively(child); + } + + return flowSnapshot; + } + + private void populateVersionedContentsRecursively(final VersionedProcessGroup group) throws NiFiRegistryException, IOException { + if (group == null) { + return; + } + + final VersionedFlowCoordinates coordinates = group.getVersionedFlowCoordinates(); + if (coordinates != null) { + final String registryUrl = coordinates.getRegistryUrl(); + final String bucketId = coordinates.getBucketId(); + final String flowId = coordinates.getFlowId(); + final int version = coordinates.getVersion(); + + final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl); + if (registryId == null) { + throw new NiFiRegistryException("Flow contains a reference to another Versioned Flow located at URL " + registryUrl + + " but NiFi is not configured to communicate with a Flow Registry at that URL"); + } + + final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); + final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version); + final VersionedProcessGroup contents = snapshot.getFlowContents(); + + group.setComments(contents.getComments()); + group.setConnections(contents.getConnections()); + group.setControllerServices(contents.getControllerServices()); + group.setFunnels(contents.getFunnels()); + group.setInputPorts(contents.getInputPorts()); + group.setLabels(contents.getLabels()); + group.setOutputPorts(contents.getOutputPorts()); + group.setProcessGroups(contents.getProcessGroups()); + group.setProcessors(contents.getProcessors()); + group.setRemoteProcessGroups(contents.getRemoteProcessGroups()); + group.setVariables(contents.getVariables()); + } + + for (final VersionedProcessGroup child : group.getProcessGroups()) { + populateVersionedContentsRecursively(child); + } + } + + @Override + public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, NiFiRegistryException { + final FlowClient flowClient = getRegistryClient().getFlowClient(); + return flowClient.get(bucketId, flowId); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java index 828b970..d5d0d86 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java @@ -23,7 +23,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.framework.security.util.SslContextFactory; +import org.apache.nifi.util.NiFiProperties; + public class StandardFlowRegistryClient implements FlowRegistryClient { + private NiFiProperties nifiProperties; private ConcurrentMap<String, FlowRegistry> registryById = new ConcurrentHashMap<>(); @Override @@ -59,6 +65,16 @@ public class StandardFlowRegistryClient implements FlowRegistryClient { registry.setName(registryName); registry.setDescription(description); + } else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) { + final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false); + if (sslContext == null && uriScheme.equalsIgnoreCase("https")) { + throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl + + " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. " + + "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https."); + } + + registry = new RestBasedFlowRegistry(this, registryId, registryUrl, sslContext, registryName); + registry.setDescription(description); } else { throw new IllegalArgumentException("Cannot create Flow Registry with URI of " + registryUrl + " because there are no known implementations of Flow Registries that can handle URIs of scheme " + uriScheme); @@ -72,4 +88,8 @@ public class StandardFlowRegistryClient implements FlowRegistryClient { public FlowRegistry removeFlowRegistry(final String registryId) { return registryById.remove(registryId); } + + public void setProperties(final NiFiProperties nifiProperties) { + this.nifiProperties = nifiProperties; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java index 41b98ed..aaba126 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java @@ -17,21 +17,131 @@ package org.apache.nifi.registry.flow; +import java.util.Objects; import java.util.Optional; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; + public class StandardVersionControlInformation implements VersionControlInformation { private final String registryIdentifier; + private volatile String registryName; private final String bucketIdentifier; + private volatile String bucketName; private final String flowIdentifier; + private volatile String flowName; + private volatile String flowDescription; private final int version; private volatile VersionedProcessGroup flowSnapshot; private volatile Boolean modified = null; private volatile Boolean current = null; - public StandardVersionControlInformation(final String registryId, final String bucketId, final String flowId, final int version, + public static class Builder { + private String registryIdentifier; + private String registryName; + private String bucketIdentifier; + private String bucketName; + private String flowIdentifier; + private String flowName; + private String flowDescription; + private int version; + private VersionedProcessGroup flowSnapshot; + private Boolean modified = null; + private Boolean current = null; + + public Builder registryId(String registryId) { + this.registryIdentifier = registryId; + return this; + } + + public Builder registryName(String registryName) { + this.registryName = registryName; + return this; + } + + public Builder bucketId(String bucketId) { + this.bucketIdentifier = bucketId; + return this; + } + + public Builder bucketName(String bucketName) { + this.bucketName = bucketName; + return this; + } + + public Builder flowId(String flowId) { + this.flowIdentifier = flowId; + return this; + } + + public Builder flowName(String flowName) { + this.flowName = flowName; + return this; + } + + public Builder flowDescription(String flowDescription) { + this.flowDescription = flowDescription; + return this; + } + + public Builder version(int version) { + this.version = version; + return this; + } + + public Builder modified(Boolean modified) { + this.modified = modified; + return this; + } + + public Builder current(Boolean current) { + this.current = current; + return this; + } + + public Builder flowSnapshot(VersionedProcessGroup snapshot) { + this.flowSnapshot = snapshot; + return this; + } + + public static Builder fromDto(VersionControlInformationDTO dto) { + Builder builder = new Builder(); + builder.registryId(dto.getRegistryId()) + .registryName(dto.getRegistryName()) + .bucketId(dto.getBucketId()) + .bucketName(dto.getBucketName()) + .flowId(dto.getFlowId()) + .flowName(dto.getFlowName()) + .flowDescription(dto.getFlowDescription()) + .current(dto.getCurrent()) + .modified(dto.getModified()) + .version(dto.getVersion()); + + return builder; + } + + public StandardVersionControlInformation build() { + Objects.requireNonNull(registryIdentifier, "Registry ID must be specified"); + Objects.requireNonNull(bucketIdentifier, "Bucket ID must be specified"); + Objects.requireNonNull(flowIdentifier, "Flow ID must be specified"); + Objects.requireNonNull(version, "Version must be specified"); + + final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName, + bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current); + + svci.setBucketName(bucketName); + svci.setFlowName(flowName); + svci.setFlowDescription(flowDescription); + + return svci; + } + } + + + public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version, final VersionedProcessGroup snapshot, final Boolean modified, final Boolean current) { this.registryIdentifier = registryId; + this.registryName = registryName; this.bucketIdentifier = bucketId; this.flowIdentifier = flowId; this.version = version; @@ -40,21 +150,58 @@ public class StandardVersionControlInformation implements VersionControlInformat this.current = current; } + @Override public String getRegistryIdentifier() { return registryIdentifier; } @Override + public String getRegistryName() { + return registryName; + } + + public void setRegistryName(final String registryName) { + this.registryName = registryName; + } + + @Override public String getBucketIdentifier() { return bucketIdentifier; } @Override + public String getBucketName() { + return bucketName; + } + + public void setBucketName(final String bucketName) { + this.bucketName = bucketName; + } + + @Override public String getFlowIdentifier() { return flowIdentifier; } + public void setFlowName(String flowName) { + this.flowName = flowName; + } + + @Override + public String getFlowName() { + return flowName; + } + + public void setFlowDescription(String flowDescription) { + this.flowDescription = flowDescription; + } + + @Override + public String getFlowDescription() { + return flowDescription; + } + @Override public int getVersion() { return version; http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index a75d112..a10a1b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -80,14 +80,11 @@ public class NiFiRegistryFlowMapper { // created before attempting to create the connection, where the ConnectableDTO is converted. private Map<String, String> versionedComponentIds = new HashMap<>(); - public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient) { + public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean mapDescendantVersionedFlows) { versionedComponentIds.clear(); - final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true); + final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true, mapDescendantVersionedFlows); - // TODO: Test that this works properly populateReferencedAncestorServices(group, mapped); - - // TODO: Test that this works properly populateReferencedAncestorVariables(group, mapped); return mapped; @@ -149,7 +146,10 @@ public class NiFiRegistryFlowMapper { if (!implicitlyDefinedVariables.isEmpty()) { // Merge the implicit variables with the explicitly defined variables for the Process Group // and set those as the Versioned Group's variables. - implicitlyDefinedVariables.putAll(versionedGroup.getVariables()); + if (versionedGroup.getVariables() != null) { + implicitlyDefinedVariables.putAll(versionedGroup.getVariables()); + } + versionedGroup.setVariables(implicitlyDefinedVariables); } } @@ -167,7 +167,7 @@ public class NiFiRegistryFlowMapper { } - private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel) { + private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel, final boolean mapDescendantVersionedFlows) { final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier()); versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier())); versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier())); @@ -192,10 +192,22 @@ public class NiFiRegistryFlowMapper { coordinates.setBucketId(versionControlInfo.getBucketIdentifier()); coordinates.setFlowId(versionControlInfo.getFlowIdentifier()); coordinates.setVersion(versionControlInfo.getVersion()); + versionedGroup.setVersionedFlowCoordinates(coordinates); + + // We need to register the Port ID -> Versioned Component ID's in our versionedComponentIds member variable for all input & output ports. + // Otherwise, we will not be able to lookup the port when connecting to it. + for (final Port port : group.getInputPorts()) { + getId(port.getVersionedComponentId(), port.getIdentifier()); + } + for (final Port port : group.getOutputPorts()) { + getId(port.getVersionedComponentId(), port.getIdentifier()); + } // If the Process Group itself is remotely versioned, then we don't want to include its contents // because the contents are remotely managed and not part of the versioning of this Process Group - return versionedGroup; + if (!mapDescendantVersionedFlows) { + return versionedGroup; + } } } @@ -228,7 +240,7 @@ public class NiFiRegistryFlowMapper { .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setProcessGroups(group.getProcessGroups().stream() - .map(grp -> mapGroup(grp, registryClient, false)) + .map(grp -> mapGroup(grp, registryClient, false, mapDescendantVersionedFlows)) .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setConnections(group.getConnections().stream() http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 8954f39..9e81d22 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -51,7 +51,7 @@ <xs:element name="id" type="NonEmptyStringType" /> <xs:element name="name" type="NonEmptyStringType" /> <xs:element name="url" type="NonEmptyStringType" /> - <xs:element name="description" type="NonEmptyStringType" /> + <xs:element name="description" type="xs:string" /> </xs:sequence> </xs:complexType> @@ -180,7 +180,10 @@ <xs:sequence> <xs:element name="registryId" type="NonEmptyStringType" /> <xs:element name="bucketId" type="NonEmptyStringType" /> + <xs:element name="bucketName" type="NonEmptyStringType" /> <xs:element name="flowId" type="NonEmptyStringType" /> + <xs:element name="flowName" type="NonEmptyStringType" /> + <xs:element name="flowDescription" type="xs:string" /> <xs:element name="version" type="NonEmptyStringType" /> </xs:sequence> </xs:complexType> http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index fc42c62..d9f89aa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -36,7 +36,9 @@ </bean> <!-- flow registry --> - <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.StandardFlowRegistryClient" /> + <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.StandardFlowRegistryClient"> + <property name="properties" ref="nifiProperties" /> + </bean> <!-- flow controller --> <bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean"> http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index e299059..be907ba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,6 +16,14 @@ */ package org.apache.nifi.web; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + import org.apache.nifi.authorization.AuthorizeAccess; import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.user.NiFiUser; @@ -23,7 +31,7 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.registry.flow.UnknownResourceException; +import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedProcessGroup; @@ -86,6 +94,7 @@ import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.CurrentUserEntity; +import org.apache.nifi.web.api.entity.FlowComparisonEntity; import org.apache.nifi.web.api.entity.FlowConfigurationEntity; import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FunnelEntity; @@ -115,14 +124,6 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; -import java.io.IOException; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; - /** * Defines the NiFiServiceFacade interface. */ @@ -1274,6 +1275,17 @@ public interface NiFiServiceFacade { // ---------------------------------------- /** + * Returns a FlowComparisonEntity that contains all of the local modifications since the Process Group + * was last synchronized with the Flow Registry + * + * @param processGroupId + * @return a FlowComparisonEntity that contains all of the local modifications since the Process Group + * was last synchronized with the Flow Registry + * @throws IllegalStateException if the Process Group with the given ID is not under version control + */ + FlowComparisonEntity getLocalModifications(String processGroupId) throws IOException, NiFiRegistryException; + + /** * Returns the Version Control information for the Process Group with the given ID * * @param processGroupId the ID of the Process Group @@ -1292,7 +1304,7 @@ public interface NiFiServiceFacade { * * @throws IOException if unable to communicate with the Flow Registry */ - VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, UnknownResourceException; + VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, NiFiRegistryException; /** * Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry @@ -1312,11 +1324,13 @@ public interface NiFiServiceFacade { * @param flow the flow where the snapshot should be persisted * @param snapshot the Snapshot to persist * @param comments about the snapshot + * @param expectedVersion the version to save the flow as * @return the snapshot that represents what was stored in the registry * * @throws IOException if unable to communicate with the Flow Registry */ - VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException; + VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion) + throws IOException, NiFiRegistryException; /** * Updates the Version Control Information on the Process Group with the given ID @@ -1351,6 +1365,15 @@ public interface NiFiServiceFacade { VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException; /** + * Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return + * the ID itself as the name + * + * @param flowRegistryId the id of the flow registry + * @return the name of the Flow Registry that is registered with the given ID, or the ID itself if no Flow Registry is registered with the given ID + */ + String getFlowRegistryName(String flowRegistryId); + + /** * Determines which components currently exist in the Process Group with the given identifier and calculates which of those components * would be impacted by updating the Process Group to the provided snapshot * http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index a319f27..89e00ba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -90,7 +90,7 @@ import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.UnknownResourceException; +import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.VersionedConnection; @@ -101,11 +101,13 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.DifferenceType; +import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.FlowComparator; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; @@ -128,6 +130,7 @@ import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; import org.apache.nifi.web.api.dto.ClusterDTO; import org.apache.nifi.web.api.dto.ComponentDTO; +import org.apache.nifi.web.api.dto.ComponentDifferenceDTO; import org.apache.nifi.web.api.dto.ComponentHistoryDTO; import org.apache.nifi.web.api.dto.ComponentReferenceDTO; import org.apache.nifi.web.api.dto.ComponentStateDTO; @@ -205,6 +208,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.CurrentUserEntity; +import org.apache.nifi.web.api.entity.FlowComparisonEntity; import org.apache.nifi.web.api.entity.FlowConfigurationEntity; import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FunnelEntity; @@ -3628,6 +3632,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + final VersionControlInformation currentVci = processGroup.getVersionControlInformation(); + final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1; + // Create a VersionedProcessGroup snapshot of the flow as it is currently. final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId); @@ -3660,8 +3668,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // add first snapshot to the flow in the registry final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription(); - registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments); - } catch (final UnknownResourceException e) { + registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments, expectedVersion); + } catch (final NiFiRegistryException e) { throw new IllegalArgumentException(e); } catch (final IOException ioe) { // will result in a 500: Internal Server Error @@ -3671,12 +3679,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // Update the Process Group with the new VersionControlInformation. (Send this to all nodes). final VersionControlInformationDTO vci = new VersionControlInformationDTO(); vci.setBucketId(registeredFlow.getBucketIdentifier()); + vci.setBucketName(registeredFlow.getBucketName()); vci.setCurrent(true); vci.setFlowId(registeredFlow.getIdentifier()); vci.setFlowName(registeredFlow.getName()); + vci.setFlowDescription(registeredFlow.getDescription()); vci.setGroupId(groupId); vci.setModified(false); vci.setRegistryId(registryId); + vci.setRegistryName(getFlowRegistryName(registryId)); vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion()); final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup); @@ -3707,12 +3718,67 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient); + final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); return versionedGroup; } @Override - public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, UnknownResourceException { + public FlowComparisonEntity getLocalModifications(final String processGroupId) throws IOException, NiFiRegistryException { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); + final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); + if (versionControlInfo == null) { + throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control"); + } + + final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryIdentifier()); + if (flowRegistry == null) { + throw new IllegalStateException("Process Group with ID " + processGroupId + " is tracking to a flow in Flow Registry with ID " + versionControlInfo.getRegistryIdentifier() + + " but cannot find a Flow Registry with that identifier"); + } + + final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(), + versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion()); + + + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true); + final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); + + final ComparableDataFlow localFlow = new ComparableDataFlow() { + @Override + public VersionedProcessGroup getContents() { + return localGroup; + } + + @Override + public String getName() { + return "Local Flow"; + } + }; + + final ComparableDataFlow registryFlow = new ComparableDataFlow() { + @Override + public VersionedProcessGroup getContents() { + return registryGroup; + } + + @Override + public String getName() { + return "Versioned Flow"; + } + }; + + final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new EvolvingDifferenceDescriptor()); + final FlowComparison flowComparison = flowComparator.compare(); + + final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison); + final FlowComparisonEntity entity = new FlowComparisonEntity(); + entity.setComponentDifferences(differenceDtos); + return entity; + } + + @Override + public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException { final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); if (registry == null) { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); @@ -3721,7 +3787,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return registry.registerVersionedFlow(flow); } - private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, UnknownResourceException { + private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException { final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); if (registry == null) { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); @@ -3732,13 +3798,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow, - final VersionedProcessGroup snapshot, final String comments) throws IOException, UnknownResourceException { + final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) throws IOException, NiFiRegistryException { final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); if (registry == null) { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); } - return registry.registerVersionedFlowSnapshot(flow, snapshot, comments); + return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion); } @Override @@ -3778,12 +3844,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient); + final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient, true); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents); - final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", updatedSnapshot.getFlowContents()); + final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents()); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow); + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream() @@ -3958,7 +4024,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedFlowSnapshot snapshot; try { snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion()); - } catch (final UnknownResourceException e) { + } catch (final NiFiRegistryException e) { throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket " + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion()); } @@ -3969,6 +4035,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return snapshot; } + @Override + public String getFlowRegistryName(final String flowRegistryId) { + final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId); + return flowRegistry == null ? flowRegistryId : flowRegistry.getName(); + } + private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException { final VersionedProcessGroup group = snapshot.getFlowContents(); @@ -3993,7 +4065,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedFlowSnapshot childSnapshot; try { childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion()); - } catch (final UnknownResourceException e) { + } catch (final NiFiRegistryException e) { throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket " + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index b8bdc14..6bf4cca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -39,6 +39,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.nar.NarClassLoaders; +import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.IllegalClusterResourceRequestException; import org.apache.nifi.web.NiFiServiceFacade; @@ -1373,7 +1374,7 @@ public class FlowResource extends ApplicationResource { value = "The registry id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") String id) throws NiFiRegistryException { authorizeFlow(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 11c548f..d24dcbb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -87,8 +87,10 @@ import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.serialization.FlowEncodingVersion; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.FlowRegistryUtils; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; @@ -123,6 +125,7 @@ import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity; import org.apache.nifi.web.api.entity.Entity; +import org.apache.nifi.web.api.entity.FlowComparisonEntity; import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.FunnelsEntity; @@ -301,6 +304,53 @@ public class ProcessGroupResource extends ApplicationResource { /** + * Retrieves a list of local modifications to the Process Group since it was last synchronized with the Flow Registry + * + * @param groupId The id of the process group. + * @return A processGroupEntity. + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/local-modifications") + @ApiOperation( + value = "Gets a list of local modifications to the Process Group since it was last synchronized with the Flow Registry", + response = FlowComparisonEntity.class, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}"), + @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getLocalModifications( + @ApiParam( + value = "The process group id.", + required = false + ) + @PathParam("id") final String groupId) throws IOException, NiFiRegistryException { + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); + final Authorizable processGroup = groupAuthorizable.getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, true, false); + }); + + final FlowComparisonEntity entity = serviceFacade.getLocalModifications(groupId); + return generateOkResponse(entity).build(); + } + + + /** * Retrieves the Variable Registry for the group with the given ID * * @param groupId the ID of the Process Group @@ -1594,6 +1644,13 @@ public class ProcessGroupResource extends ApplicationResource { // Step 2: Retrieve flow from Flow Registry final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo); + final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); + versionControlInfo.setBucketName(metadata.getBucketName()); + versionControlInfo.setFlowName(metadata.getFlowName()); + versionControlInfo.setFlowDescription(metadata.getFlowDescription()); + + versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId())); + // Step 3: Resolve Bundle info BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents()); @@ -1635,14 +1692,14 @@ public class ProcessGroupResource extends ApplicationResource { // create the process group contents final Revision revision = getRevision(processGroupGroupEntity, processGroupGroupEntity.getComponent().getId()); - final ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent()); + ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent()); final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); if (flowSnapshot != null) { final RevisionDTO revisionDto = entity.getRevision(); final String newGroupId = entity.getComponent().getId(); final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId); - serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId, + entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId, versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false); }
