simonbence commented on code in PR #6736:
URL: https://github.com/apache/nifi/pull/6736#discussion_r1037010206
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final
FlowRegistryClientUserCo
}
}
- private FlowRegistryClientNode getRegistryForInternalFlow(final String
storageLocation) throws FlowRegistryException, IOException {
- for (FlowRegistryClientNode registryClientNode :
flowManager.getAllFlowRegistryClients()) {
- if
(registryClientNode.isStorageLocationApplicable(storageLocation)) {
- return registryClientNode;
+ private RegisteredFlowSnapshot fetchFlowContents(final
FlowRegistryClientUserContext context, final VersionedFlowCoordinates
coordinates,
+ final boolean
fetchRemoteFlows) throws FlowRegistryException {
+
+ final String storageLocation = coordinates.getStorageLocation() ==
null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+ final String bucketId = coordinates.getBucketId();
+ final String flowId = coordinates.getFlowId();
+ final int version = coordinates.getVersion();
+
+ final List<FlowRegistryClientNode> clientNodes =
getRegistryClientsForInternalFlow(storageLocation);
+ for (final FlowRegistryClientNode clientNode : clientNodes) {
+ try {
+ logger.debug("Attempting to fetch flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+ final RegisteredFlowSnapshot snapshot =
clientNode.getFlowContents(context, bucketId, flowId, version,
fetchRemoteFlows);
+ coordinates.setRegistryId(clientNode.getIdentifier());
+
+ logger.debug("Successfully fetched flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+ return snapshot;
+ } catch (final Exception e) {
+ logger.debug("Failed to fetch flow", e);
Review Comment:
It would worth to provide detail about the registry and flow
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final
FlowRegistryClientUserCo
}
}
- private FlowRegistryClientNode getRegistryForInternalFlow(final String
storageLocation) throws FlowRegistryException, IOException {
- for (FlowRegistryClientNode registryClientNode :
flowManager.getAllFlowRegistryClients()) {
- if
(registryClientNode.isStorageLocationApplicable(storageLocation)) {
- return registryClientNode;
+ private RegisteredFlowSnapshot fetchFlowContents(final
FlowRegistryClientUserContext context, final VersionedFlowCoordinates
coordinates,
+ final boolean
fetchRemoteFlows) throws FlowRegistryException {
+
+ final String storageLocation = coordinates.getStorageLocation() ==
null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+ final String bucketId = coordinates.getBucketId();
+ final String flowId = coordinates.getFlowId();
+ final int version = coordinates.getVersion();
+
+ final List<FlowRegistryClientNode> clientNodes =
getRegistryClientsForInternalFlow(storageLocation);
+ for (final FlowRegistryClientNode clientNode : clientNodes) {
+ try {
+ logger.debug("Attempting to fetch flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+ final RegisteredFlowSnapshot snapshot =
clientNode.getFlowContents(context, bucketId, flowId, version,
fetchRemoteFlows);
+ coordinates.setRegistryId(clientNode.getIdentifier());
+
+ logger.debug("Successfully fetched flow for Bucket [{}] Flow
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+ return snapshot;
+ } catch (final Exception e) {
+ logger.debug("Failed to fetch flow", e);
}
}
- throw new FlowRegistryException(String.format("No applicable registry
found for storage location %s", storageLocation));
+ throw new FlowRegistryException(String.format("Could not find any
Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version
[%s] with Storage Location [%s]",
+ bucketId, flowId, version, storageLocation));
+ }
+
+ private List<FlowRegistryClientNode>
getRegistryClientsForInternalFlow(final String storageLocation) {
+ // Sort clients based on whether or not they believe they are
applicable for the given storage location
+ final List<FlowRegistryClientNode> matchingClients = new
ArrayList<>(flowManager.getAllFlowRegistryClients());
+ matchingClients.sort(Comparator.comparing(client ->
client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
+ return matchingClients;
Review Comment:
Have you considered using the isStorageLocationApplicable as filter instead
for an extra safeguard? (Like it would try not every clients but every clients
returning true for that call). My reasoning: identifiers of flows and buckets
are generated by the registry implementations, which might differ case by case.
It is possible that the ids are for example sequentials and different flows
might exist with the same bucket and flow id, but without other relation? Like
in this case if the bucket id is 1 and flow id is 1, and there are multiple
registry instances with an implementation like this, the first will be loaded.
Which in this case not necessary a flow which "matches" the storage location,
might result an unexpected flow to be loaded
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]