This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 0e920c43f6 NIFI-12918 Corrected Nested Versioned Flows for Stateless
0e920c43f6 is described below

commit 0e920c43f626a9c6b34f12e80f5b30b01a202bd9
Author: slambrose <[email protected]>
AuthorDate: Tue Mar 19 12:56:33 2024 -0500

    NIFI-12918 Corrected Nested Versioned Flows for Stateless
    
    This closes #8572
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/registry/flow/InMemoryFlowRegistry.java   |  10 +-
 .../apache/nifi/stateless/core/RegistryUtil.java   |  33 ++++++-
 .../nifi/stateless/core/TestRegistryUtil.java      | 106 +++++++++++++++++++++
 3 files changed, 146 insertions(+), 3 deletions(-)

diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
index 945de1da3c..3cb2bf081e 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
@@ -36,9 +36,17 @@ public class InMemoryFlowRegistry extends 
AbstractFlowRegistryClient implements
 
     private final Map<FlowCoordinates, List<VersionedExternalFlow>> 
flowSnapshots = new ConcurrentHashMap<>();
 
+    /**
+     * Returns true regardless of the Flow Storage Location because this class 
is the only Flow Registry Client configured for Stateless operation
+     *
+     * @param context Configuration context.
+     * @param location The location of versioned flow to check.
+     *
+     * @return true regardless of location
+     */
     @Override
     public boolean isStorageLocationApplicable(final 
FlowRegistryClientConfigurationContext context, final String location) {
-        return false;
+        return true;
     }
 
     @Override
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
index 1e0da7c559..cc265031d0 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
@@ -30,12 +30,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 public class RegistryUtil {
     private static final Logger logger = 
LoggerFactory.getLogger(RegistryUtil.class);
 
+    private static final Pattern REGISTRY_URL_PATTERN = 
Pattern.compile("^(https?://.+?)/?nifi-registry-api.*$");
+
     private final String registryUrl;
     private NiFiRegistryClient registryClient;
     private final SSLContext sslContext;
@@ -45,6 +50,11 @@ public class RegistryUtil {
         this.sslContext = sslContext;
     }
 
+    public RegistryUtil(final NiFiRegistryClient registryClient, final String 
registryUrl, final SSLContext sslContext) {
+        this.registryClient = registryClient;
+        this.registryUrl = registryUrl;
+        this.sslContext = sslContext;
+    }
 
     public VersionedFlowSnapshot getFlowByID(String bucketID, String flowID) 
throws IOException, NiFiRegistryException {
         return getFlowByID(bucketID, flowID, -1);
@@ -122,6 +132,14 @@ public class RegistryUtil {
         return flowSnapshot;
     }
 
+    protected String getBaseRegistryUrl(final String storageLocation) {
+        final Matcher matcher = REGISTRY_URL_PATTERN.matcher(storageLocation);
+        if (matcher.matches()) {
+            return matcher.group(1);
+        } else {
+            return storageLocation;
+        }
+    }
 
     private void populateVersionedContentsRecursively(final 
VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException, 
IOException {
         if (group == null) {
@@ -130,12 +148,12 @@ public class RegistryUtil {
 
         final VersionedFlowCoordinates coordinates = 
group.getVersionedFlowCoordinates();
         if (coordinates != null) {
-            final String registryUrl = coordinates.getRegistryUrl();
+            final String subRegistryUrl = 
getBaseRegistryUrl(coordinates.getStorageLocation());
             final String bucketId = coordinates.getBucketId();
             final String flowId = coordinates.getFlowId();
             final int version = coordinates.getVersion();
 
-            final RegistryUtil subFlowUtil = new RegistryUtil(registryUrl, 
sslContext);
+            final RegistryUtil subFlowUtil = 
getSubRegistryUtil(subRegistryUrl);
             final VersionedFlowSnapshot snapshot = 
subFlowUtil.getFlowByID(bucketId, flowId, version);
             final VersionedProcessGroup contents = snapshot.getFlowContents();
 
@@ -163,4 +181,15 @@ public class RegistryUtil {
             populateVersionedContentsRecursively(child, user);
         }
     }
+
+    private RegistryUtil getSubRegistryUtil(final String subRegistryUrl) {
+        final RegistryUtil subRegistryUtil;
+        if (registryUrl.startsWith(subRegistryUrl)) {
+            // Share current Registry Client for matching Registry URL
+            subRegistryUtil = new RegistryUtil(registryClient, subRegistryUrl, 
sslContext);
+        } else {
+            subRegistryUtil = new RegistryUtil(subRegistryUrl, sslContext);
+        }
+        return subRegistryUtil;
+    }
 }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
new file mode 100644
index 0000000000..ceb5ddaa30
--- /dev/null
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.core;
+
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestRegistryUtil {
+    private static final String BASE_REGISTRY_URL = 
"https://localhost:18443/context-path";;
+
+    private static final String STORAGE_LOCATION_FORMAT = 
"%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+
+    private static final String ROOT_BUCKET_ID = UUID.randomUUID().toString();
+    private static final String ROOT_FLOW_ID = UUID.randomUUID().toString();
+    private static final int ROOT_VERSION = 1;
+
+    private static final String CHILD_BUCKET_ID = UUID.randomUUID().toString();
+    private static final String CHILD_FLOW_ID = UUID.randomUUID().toString();
+    private static final int CHILD_VERSION = 2;
+
+    @Test
+    public void testGetBaseRegistryUrl() throws NiFiRegistryException, 
IOException {
+        final NiFiRegistryClient registryClient = 
mock(NiFiRegistryClient.class);
+        final RegistryUtil registryUtil = new RegistryUtil(registryClient, 
BASE_REGISTRY_URL, null);
+
+        final FlowSnapshotClient flowSnapshotClient = 
mock(FlowSnapshotClient.class);
+        
when(registryClient.getFlowSnapshotClient()).thenReturn(flowSnapshotClient);
+
+        final VersionedProcessGroup childVersionedProcessGroup = 
getChildVersionedProcessGroup();
+        final VersionedFlowSnapshot rootSnapshot = 
buildRootSnapshot(Collections.singleton(childVersionedProcessGroup));
+        final String rootRegistryUrl = 
registryUtil.getBaseRegistryUrl(rootSnapshot.getFlowContents().getVersionedFlowCoordinates().getStorageLocation());
+        assertEquals(BASE_REGISTRY_URL, rootRegistryUrl);
+
+        final VersionedFlowSnapshot childSnapshot = new 
VersionedFlowSnapshot();
+        childSnapshot.setFlowContents(childVersionedProcessGroup);
+        final String childRegistryUrl = 
registryUtil.getBaseRegistryUrl(childVersionedProcessGroup.getVersionedFlowCoordinates().getStorageLocation());
+        assertEquals(BASE_REGISTRY_URL, childRegistryUrl);
+
+        when(flowSnapshotClient.get(eq(ROOT_BUCKET_ID), eq(ROOT_FLOW_ID), 
eq(ROOT_VERSION))).thenReturn(rootSnapshot);
+        when(flowSnapshotClient.get(eq(CHILD_BUCKET_ID), eq(CHILD_FLOW_ID), 
eq(CHILD_VERSION))).thenReturn(childSnapshot);
+
+        final VersionedFlowSnapshot flowSnapshot = 
registryUtil.getFlowContents(ROOT_BUCKET_ID, ROOT_FLOW_ID, ROOT_VERSION, true, 
null);
+        assertEquals(rootSnapshot, flowSnapshot);
+    }
+
+    private VersionedFlowSnapshot buildRootSnapshot(final 
Set<VersionedProcessGroup> childGroups){
+        final String storageLocation = String.format(STORAGE_LOCATION_FORMAT, 
BASE_REGISTRY_URL, ROOT_BUCKET_ID, ROOT_FLOW_ID, ROOT_VERSION);
+        final VersionedFlowCoordinates coordinates = new 
VersionedFlowCoordinates();
+        coordinates.setStorageLocation(storageLocation);
+        coordinates.setBucketId(ROOT_BUCKET_ID);
+        coordinates.setFlowId(ROOT_FLOW_ID);
+        coordinates.setVersion(ROOT_VERSION);
+
+        final VersionedProcessGroup group = new VersionedProcessGroup();
+        group.setVersionedFlowCoordinates(coordinates);
+        group.setProcessGroups(childGroups);
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setFlowContents(group);
+        return snapshot;
+    }
+
+    private VersionedProcessGroup getChildVersionedProcessGroup() {
+        final String storageLocation = String.format(STORAGE_LOCATION_FORMAT, 
BASE_REGISTRY_URL, CHILD_BUCKET_ID, CHILD_FLOW_ID, CHILD_VERSION);
+        final VersionedFlowCoordinates coordinates = new 
VersionedFlowCoordinates();
+        coordinates.setStorageLocation(storageLocation);
+        coordinates.setBucketId(CHILD_BUCKET_ID);
+        coordinates.setFlowId(CHILD_FLOW_ID);
+        coordinates.setVersion(CHILD_VERSION);
+
+        final VersionedProcessGroup group = new VersionedProcessGroup();
+        group.setVersionedFlowCoordinates(coordinates);
+
+        return group;
+    }
+}
\ No newline at end of file

Reply via email to