This is an automated email from the ASF dual-hosted git repository.
bsimon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2473683ce5 NIFI-10918: When fetching a flow from a Flow Registry, if
it references any 'internal versioned flows' instead of requiring that we have
a client configured for the appropriate URL, attempt to fetch the flow from
each client. We will start with the clients that do report that they can handle
the URL but will try others as well. As soon as we successfully fetch the flow,
we stop.
2473683ce5 is described below
commit 2473683ce59e187b98af6be1bb7b0f2e046a0986
Author: Mark Payne <[email protected]>
AuthorDate: Wed Nov 30 18:08:28 2022 -0500
NIFI-10918: When fetching a flow from a Flow Registry, if it references any
'internal versioned flows' instead of requiring that we have a client
configured for the appropriate URL, attempt to fetch the flow from each client.
We will start with the clients that do report that they can handle the URL but
will try others as well. As soon as we successfully fetch the flow, we stop.
NIFI-10918: Fixed checkstyle violations
This closes #6736
Signed-off-by: Bence Simon <[email protected]>
---
.../flow/StandardFlowRegistryClientNode.java | 44 +++++++++++++++++-----
.../apache/nifi/util/FlowDifferenceFilters.java | 27 +++++++++++++
2 files changed, 61 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
index c2a5462ebc..98a0925880 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
@@ -46,11 +46,15 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,6 +62,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public final class StandardFlowRegistryClientNode extends
AbstractComponentNode implements FlowRegistryClientNode {
+ private static final Logger logger =
LoggerFactory.getLogger(StandardFlowRegistryClientNode.class);
private final FlowManager flowManager;
private final Authorizable parent;
@@ -299,11 +304,7 @@ public final class StandardFlowRegistryClientNode extends
AbstractComponentNode
final VersionedFlowCoordinates coordinates =
group.getVersionedFlowCoordinates();
if (coordinates != null) {
- final String storageLocation = coordinates.getStorageLocation() ==
null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
- final String bucketId = coordinates.getBucketId();
- final String flowId = coordinates.getFlowId();
- final int version = coordinates.getVersion();
- final RegisteredFlowSnapshot snapshot =
getRegistryForInternalFlow(storageLocation).getFlowContents(context, bucketId,
flowId, version, true);
+ final RegisteredFlowSnapshot snapshot = fetchFlowContents(context,
coordinates, true);
final VersionedProcessGroup contents = snapshot.getFlowContents();
group.setVersionedFlowCoordinates(coordinates);
@@ -332,14 +333,37 @@ public final class StandardFlowRegistryClientNode extends
AbstractComponentNode
}
}
- private FlowRegistryClientNode getRegistryForInternalFlow(final String
storageLocation) throws FlowRegistryException, IOException {
- for (FlowRegistryClientNode registryClientNode :
flowManager.getAllFlowRegistryClients()) {
- if
(registryClientNode.isStorageLocationApplicable(storageLocation)) {
- return registryClientNode;
+ private RegisteredFlowSnapshot fetchFlowContents(final
FlowRegistryClientUserContext context, final VersionedFlowCoordinates
coordinates,
+ final boolean
fetchRemoteFlows) throws FlowRegistryException {
+
+ final String storageLocation = coordinates.getStorageLocation() ==
null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+ final String bucketId = coordinates.getBucketId();
+ final String flowId = coordinates.getFlowId();
+ final int version = coordinates.getVersion();
+
+ final List<FlowRegistryClientNode> clientNodes =
getRegistryClientsForInternalFlow(storageLocation);
+ for (final FlowRegistryClientNode clientNode : clientNodes) {
+ try {
+ logger.debug("Attempting to fetch flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+ final RegisteredFlowSnapshot snapshot =
clientNode.getFlowContents(context, bucketId, flowId, version,
fetchRemoteFlows);
+ coordinates.setRegistryId(clientNode.getIdentifier());
+
+ logger.debug("Successfully fetched flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+ return snapshot;
+ } catch (final Exception e) {
+ logger.debug("Failed to fetch flow", e);
}
}
- throw new FlowRegistryException(String.format("No applicable registry
found for storage location %s", storageLocation));
+ throw new FlowRegistryException(String.format("Could not find any
Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version
[%s] with Storage Location [%s]",
+ bucketId, flowId, version, storageLocation));
+ }
+
+ private List<FlowRegistryClientNode>
getRegistryClientsForInternalFlow(final String storageLocation) {
+ // Sort clients based on whether or not they believe they are
applicable for the given storage location
+ final List<FlowRegistryClientNode> matchingClients = new
ArrayList<>(flowManager.getAllFlowRegistryClients());
+ matchingClients.sort(Comparator.comparing(client ->
client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
+ return matchingClients;
}
private RegisteredFlowSnapshot createRegisteredFlowSnapshot(
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index 512bc3696e..b052af8110 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -72,9 +72,36 @@ public class FlowDifferenceFilters {
|| isNewRetryConfigWithDefaultValue(difference, flowManager)
|| isNewZIndexLabelConfigWithDefaultValue(difference, flowManager)
|| isNewZIndexConnectionConfigWithDefaultValue(difference,
flowManager)
+ || isRegistryUrlChange(difference)
|| isParameterContextChange(difference);
}
+ // The Registry URL may change if, for instance, a registry is moved to a
new host, or is made secure, the port changes, etc.
+ // Since this can be handled by the client anyway, there's no need to flag
this as a 'local modification'
+ private static boolean isRegistryUrlChange(final FlowDifference
difference) {
+ if (difference.getDifferenceType() !=
DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED) {
+ return false;
+ }
+ if (!(difference.getValueA() instanceof VersionedFlowCoordinates)) {
+ return false;
+ }
+ if (!(difference.getValueB() instanceof VersionedFlowCoordinates)) {
+ return false;
+ }
+
+ final VersionedFlowCoordinates coordinatesA =
(VersionedFlowCoordinates) difference.getValueA();
+ final VersionedFlowCoordinates coordinatesB =
(VersionedFlowCoordinates) difference.getValueB();
+
+ if (Objects.equals(coordinatesA.getBucketId(),
coordinatesB.getBucketId())
+ && Objects.equals(coordinatesA.getFlowId(),
coordinatesB.getFlowId())
+ && Objects.equals(coordinatesA.getVersion(),
coordinatesB.getVersion())) {
+
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Predicate that returns true if the difference is NOT a name change on a
public port (i.e. VersionedPort that allows remote access).
*/