markap14 commented on code in PR #6736:
URL: https://github.com/apache/nifi/pull/6736#discussion_r1037128202


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final 
FlowRegistryClientUserCo
         }
     }
 
-    private FlowRegistryClientNode getRegistryForInternalFlow(final String 
storageLocation) throws FlowRegistryException, IOException {
-        for (FlowRegistryClientNode registryClientNode : 
flowManager.getAllFlowRegistryClients()) {
-            if 
(registryClientNode.isStorageLocationApplicable(storageLocation)) {
-                return registryClientNode;
+    private RegisteredFlowSnapshot fetchFlowContents(final 
FlowRegistryClientUserContext context, final VersionedFlowCoordinates 
coordinates,
+                                                     final boolean 
fetchRemoteFlows) throws FlowRegistryException {
+
+        final String storageLocation = coordinates.getStorageLocation() == 
null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+        final String bucketId = coordinates.getBucketId();
+        final String flowId = coordinates.getFlowId();
+        final int version = coordinates.getVersion();
+
+        final List<FlowRegistryClientNode> clientNodes = 
getRegistryClientsForInternalFlow(storageLocation);
+        for (final FlowRegistryClientNode clientNode : clientNodes) {
+            try {
+                logger.debug("Attempting to fetch flow for Bucket [{}] Flow 
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                final RegisteredFlowSnapshot snapshot = 
clientNode.getFlowContents(context, bucketId, flowId, version, 
fetchRemoteFlows);
+                coordinates.setRegistryId(clientNode.getIdentifier());
+
+                logger.debug("Successfully fetched flow for Bucket [{}] Flow 
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                return snapshot;
+            } catch (final Exception e) {
+                logger.debug("Failed to fetch flow", e);
             }
         }
 
-        throw new FlowRegistryException(String.format("No applicable registry 
found for storage location %s", storageLocation));
+        throw new FlowRegistryException(String.format("Could not find any 
Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version 
[%s] with Storage Location [%s]",
+            bucketId, flowId, version, storageLocation));
+    }
+
+    private List<FlowRegistryClientNode> 
getRegistryClientsForInternalFlow(final String storageLocation) {
+        // Sort clients based on whether or not they believe they are 
applicable for the given storage location
+        final List<FlowRegistryClientNode> matchingClients = new 
ArrayList<>(flowManager.getAllFlowRegistryClients());
+        matchingClients.sort(Comparator.comparing(client -> 
client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
+        return matchingClients;

Review Comment:
   Interesting, I didn't consider the idea of buckets and flows having one-up 
IDs. I suppose it's possible. We could update the documentation to recommend 
against that.
   
   We need to check all registry clients, however, because a URL is not 
sufficient to know whether or not a given client is applicable. For example, if 
the port or hostname of the registry changes (as in the example in the Jira, 
when the registry went from being non-secure to secure) all of a sudden we can 
no longer load any flow that has an 'inner versioned flow' simply because the 
URL is no longer accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to