This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 0e920c43f6 NIFI-12918 Corrected Nested Versioned Flows for Stateless
0e920c43f6 is described below
commit 0e920c43f626a9c6b34f12e80f5b30b01a202bd9
Author: slambrose <[email protected]>
AuthorDate: Tue Mar 19 12:56:33 2024 -0500
NIFI-12918 Corrected Nested Versioned Flows for Stateless
This closes #8572
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/registry/flow/InMemoryFlowRegistry.java | 10 +-
.../apache/nifi/stateless/core/RegistryUtil.java | 33 ++++++-
.../nifi/stateless/core/TestRegistryUtil.java | 106 +++++++++++++++++++++
3 files changed, 146 insertions(+), 3 deletions(-)
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
index 945de1da3c..3cb2bf081e 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java
@@ -36,9 +36,17 @@ public class InMemoryFlowRegistry extends
AbstractFlowRegistryClient implements
private final Map<FlowCoordinates, List<VersionedExternalFlow>>
flowSnapshots = new ConcurrentHashMap<>();
+ /**
+ * Returns true regardless of the Flow Storage Location because this class
is the only Flow Registry Client configured for Stateless operation
+ *
+ * @param context Configuration context.
+ * @param location The location of versioned flow to check.
+ *
+ * @return true regardless of location
+ */
@Override
public boolean isStorageLocationApplicable(final
FlowRegistryClientConfigurationContext context, final String location) {
- return false;
+ return true;
}
@Override
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
index 1e0da7c559..cc265031d0 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
@@ -30,12 +30,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class RegistryUtil {
private static final Logger logger =
LoggerFactory.getLogger(RegistryUtil.class);
+ private static final Pattern REGISTRY_URL_PATTERN =
Pattern.compile("^(https?://.+?)/?nifi-registry-api.*$");
+
private final String registryUrl;
private NiFiRegistryClient registryClient;
private final SSLContext sslContext;
@@ -45,6 +50,11 @@ public class RegistryUtil {
this.sslContext = sslContext;
}
+ public RegistryUtil(final NiFiRegistryClient registryClient, final String
registryUrl, final SSLContext sslContext) {
+ this.registryClient = registryClient;
+ this.registryUrl = registryUrl;
+ this.sslContext = sslContext;
+ }
public VersionedFlowSnapshot getFlowByID(String bucketID, String flowID)
throws IOException, NiFiRegistryException {
return getFlowByID(bucketID, flowID, -1);
@@ -122,6 +132,14 @@ public class RegistryUtil {
return flowSnapshot;
}
+ protected String getBaseRegistryUrl(final String storageLocation) {
+ final Matcher matcher = REGISTRY_URL_PATTERN.matcher(storageLocation);
+ if (matcher.matches()) {
+ return matcher.group(1);
+ } else {
+ return storageLocation;
+ }
+ }
private void populateVersionedContentsRecursively(final
VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException,
IOException {
if (group == null) {
@@ -130,12 +148,12 @@ public class RegistryUtil {
final VersionedFlowCoordinates coordinates =
group.getVersionedFlowCoordinates();
if (coordinates != null) {
- final String registryUrl = coordinates.getRegistryUrl();
+ final String subRegistryUrl =
getBaseRegistryUrl(coordinates.getStorageLocation());
final String bucketId = coordinates.getBucketId();
final String flowId = coordinates.getFlowId();
final int version = coordinates.getVersion();
- final RegistryUtil subFlowUtil = new RegistryUtil(registryUrl,
sslContext);
+ final RegistryUtil subFlowUtil =
getSubRegistryUtil(subRegistryUrl);
final VersionedFlowSnapshot snapshot =
subFlowUtil.getFlowByID(bucketId, flowId, version);
final VersionedProcessGroup contents = snapshot.getFlowContents();
@@ -163,4 +181,15 @@ public class RegistryUtil {
populateVersionedContentsRecursively(child, user);
}
}
+
+ private RegistryUtil getSubRegistryUtil(final String subRegistryUrl) {
+ final RegistryUtil subRegistryUtil;
+ if (registryUrl.startsWith(subRegistryUrl)) {
+ // Share current Registry Client for matching Registry URL
+ subRegistryUtil = new RegistryUtil(registryClient, subRegistryUrl,
sslContext);
+ } else {
+ subRegistryUtil = new RegistryUtil(subRegistryUrl, sslContext);
+ }
+ return subRegistryUtil;
+ }
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
new file mode 100644
index 0000000000..ceb5ddaa30
--- /dev/null
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.core;
+
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestRegistryUtil {
+ private static final String BASE_REGISTRY_URL =
"https://localhost:18443/context-path";
+
+ private static final String STORAGE_LOCATION_FORMAT =
"%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+
+ private static final String ROOT_BUCKET_ID = UUID.randomUUID().toString();
+ private static final String ROOT_FLOW_ID = UUID.randomUUID().toString();
+ private static final int ROOT_VERSION = 1;
+
+ private static final String CHILD_BUCKET_ID = UUID.randomUUID().toString();
+ private static final String CHILD_FLOW_ID = UUID.randomUUID().toString();
+ private static final int CHILD_VERSION = 2;
+
+ @Test
+ public void testGetBaseRegistryUrl() throws NiFiRegistryException,
IOException {
+ final NiFiRegistryClient registryClient =
mock(NiFiRegistryClient.class);
+ final RegistryUtil registryUtil = new RegistryUtil(registryClient,
BASE_REGISTRY_URL, null);
+
+ final FlowSnapshotClient flowSnapshotClient =
mock(FlowSnapshotClient.class);
+
when(registryClient.getFlowSnapshotClient()).thenReturn(flowSnapshotClient);
+
+ final VersionedProcessGroup childVersionedProcessGroup =
getChildVersionedProcessGroup();
+ final VersionedFlowSnapshot rootSnapshot =
buildRootSnapshot(Collections.singleton(childVersionedProcessGroup));
+ final String rootRegistryUrl =
registryUtil.getBaseRegistryUrl(rootSnapshot.getFlowContents().getVersionedFlowCoordinates().getStorageLocation());
+ assertEquals(BASE_REGISTRY_URL, rootRegistryUrl);
+
+ final VersionedFlowSnapshot childSnapshot = new
VersionedFlowSnapshot();
+ childSnapshot.setFlowContents(childVersionedProcessGroup);
+ final String childRegistryUrl =
registryUtil.getBaseRegistryUrl(childVersionedProcessGroup.getVersionedFlowCoordinates().getStorageLocation());
+ assertEquals(BASE_REGISTRY_URL, childRegistryUrl);
+
+ when(flowSnapshotClient.get(eq(ROOT_BUCKET_ID), eq(ROOT_FLOW_ID),
eq(ROOT_VERSION))).thenReturn(rootSnapshot);
+ when(flowSnapshotClient.get(eq(CHILD_BUCKET_ID), eq(CHILD_FLOW_ID),
eq(CHILD_VERSION))).thenReturn(childSnapshot);
+
+ final VersionedFlowSnapshot flowSnapshot =
registryUtil.getFlowContents(ROOT_BUCKET_ID, ROOT_FLOW_ID, ROOT_VERSION, true,
null);
+ assertEquals(rootSnapshot, flowSnapshot);
+ }
+
+ private VersionedFlowSnapshot buildRootSnapshot(final
Set<VersionedProcessGroup> childGroups){
+ final String storageLocation = String.format(STORAGE_LOCATION_FORMAT,
BASE_REGISTRY_URL, ROOT_BUCKET_ID, ROOT_FLOW_ID, ROOT_VERSION);
+ final VersionedFlowCoordinates coordinates = new
VersionedFlowCoordinates();
+ coordinates.setStorageLocation(storageLocation);
+ coordinates.setBucketId(ROOT_BUCKET_ID);
+ coordinates.setFlowId(ROOT_FLOW_ID);
+ coordinates.setVersion(ROOT_VERSION);
+
+ final VersionedProcessGroup group = new VersionedProcessGroup();
+ group.setVersionedFlowCoordinates(coordinates);
+ group.setProcessGroups(childGroups);
+
+ final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+ snapshot.setFlowContents(group);
+ return snapshot;
+ }
+
+ private VersionedProcessGroup getChildVersionedProcessGroup() {
+ final String storageLocation = String.format(STORAGE_LOCATION_FORMAT,
BASE_REGISTRY_URL, CHILD_BUCKET_ID, CHILD_FLOW_ID, CHILD_VERSION);
+ final VersionedFlowCoordinates coordinates = new
VersionedFlowCoordinates();
+ coordinates.setStorageLocation(storageLocation);
+ coordinates.setBucketId(CHILD_BUCKET_ID);
+ coordinates.setFlowId(CHILD_FLOW_ID);
+ coordinates.setVersion(CHILD_VERSION);
+
+ final VersionedProcessGroup group = new VersionedProcessGroup();
+ group.setVersionedFlowCoordinates(coordinates);
+
+ return group;
+ }
+}
\ No newline at end of file