markap14 commented on code in PR #6736:
URL: https://github.com/apache/nifi/pull/6736#discussion_r1037128202
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final
FlowRegistryClientUserCo
}
}
- private FlowRegistryClientNode getRegistryForInternalFlow(final String
storageLocation) throws FlowRegistryException, IOException {
- for (FlowRegistryClientNode registryClientNode :
flowManager.getAllFlowRegistryClients()) {
- if
(registryClientNode.isStorageLocationApplicable(storageLocation)) {
- return registryClientNode;
+ private RegisteredFlowSnapshot fetchFlowContents(final
FlowRegistryClientUserContext context, final VersionedFlowCoordinates
coordinates,
+ final boolean
fetchRemoteFlows) throws FlowRegistryException {
+
+ final String storageLocation = coordinates.getStorageLocation() ==
null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+ final String bucketId = coordinates.getBucketId();
+ final String flowId = coordinates.getFlowId();
+ final int version = coordinates.getVersion();
+
+ final List<FlowRegistryClientNode> clientNodes =
getRegistryClientsForInternalFlow(storageLocation);
+ for (final FlowRegistryClientNode clientNode : clientNodes) {
+ try {
+ logger.debug("Attempting to fetch flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+ final RegisteredFlowSnapshot snapshot =
clientNode.getFlowContents(context, bucketId, flowId, version,
fetchRemoteFlows);
+ coordinates.setRegistryId(clientNode.getIdentifier());
+
+ logger.debug("Successfully fetched flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+ return snapshot;
+ } catch (final Exception e) {
+ logger.debug("Failed to fetch flow", e);
}
}
- throw new FlowRegistryException(String.format("No applicable registry
found for storage location %s", storageLocation));
+ throw new FlowRegistryException(String.format("Could not find any
Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version
[%s] with Storage Location [%s]",
+ bucketId, flowId, version, storageLocation));
+ }
+
+ private List<FlowRegistryClientNode>
getRegistryClientsForInternalFlow(final String storageLocation) {
+ // Sort clients based on whether or not they believe they are
applicable for the given storage location
+ final List<FlowRegistryClientNode> matchingClients = new
ArrayList<>(flowManager.getAllFlowRegistryClients());
+ matchingClients.sort(Comparator.comparing(client ->
client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
+ return matchingClients;
Review Comment:
Interesting, I didn't consider the idea of buckets and flows having one-up
IDs. I suppose it's possible. We could update the documentation to recommend
against that.
We need to check all registry clients, however, because a URL is not
sufficient to know whether or not a given client is applicable. For example, if
the port or hostname of the registry changes (as in the example in the Jira,
when the registry went from being non-secure to secure) all of a sudden we can
no longer load any flow that has an 'inner versioned flow' simply because the
URL is no longer accurate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]