This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 61b864e9c0 NIFI-15190 Decoupled Registry Client module from Framework
modules
61b864e9c0 is described below
commit 61b864e9c0aa621160c23ef2686740c93725f4e6
Author: exceptionfactory <[email protected]>
AuthorDate: Thu Nov 6 15:11:00 2025 -0600
NIFI-15190 Decoupled Registry Client module from Framework modules
- Refactored Extension Registry implementation and Flow Snapshot Provider
using Java HttpClient class
- Added unit tests for Extension Registry and Snapshot Provider
- Added direct dependencies on Jersey HK2 and Media libraries required for
NiFi Web API
Signed-off-by: Pierre Villard <[email protected]>
This closes #10502.
---
.../nifi-framework-nar-bom/pom.xml | 28 ++-
nifi-framework-bundle/nifi-framework-nar/pom.xml | 20 +-
.../nifi-framework/nifi-framework-cluster/pom.xml | 5 +
.../nifi-framework-components/pom.xml | 20 +-
.../extension/NiFiRegistryExtensionRegistry.java | 127 ++++++++-----
.../nifi/remote/StandardRemoteProcessGroup.java | 6 +-
.../NiFiRegistryExtensionRegistryTest.java | 165 +++++++++++++++++
.../nifi-framework/nifi-web/nifi-web-api/pom.xml | 11 +-
.../java/org/apache/nifi/web/api/FlowResource.java | 3 +-
.../apache/nifi/web/api/ProcessGroupResource.java | 3 +-
.../nifi-web/nifi-web-security/pom.xml | 5 -
.../nifi-stateless-engine/pom.xml | 10 +-
.../config/PropertiesFileFlowDefinitionParser.java | 12 +-
.../nifi/stateless/core/FlowSnapshotProvider.java | 37 ++++
.../core/RegistryFlowSnapshotProvider.java | 204 +++++++++++++++++++++
.../apache/nifi/stateless/core/RegistryUtil.java | 191 -------------------
.../core/RegistryFlowSnapshotProviderTest.java | 146 +++++++++++++++
.../nifi/stateless/core/TestRegistryUtil.java | 106 -----------
18 files changed, 699 insertions(+), 400 deletions(-)
diff --git a/nifi-framework-bundle/nifi-framework-nar-bom/pom.xml
b/nifi-framework-bundle/nifi-framework-nar-bom/pom.xml
index 9223f513af..2203fdf81d 100644
--- a/nifi-framework-bundle/nifi-framework-nar-bom/pom.xml
+++ b/nifi-framework-bundle/nifi-framework-nar-bom/pom.xml
@@ -45,6 +45,24 @@
<version>${jersey.bom.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ <version>${jersey.bom.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-json-jackson</artifactId>
+ <version>${jersey.bom.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ <version>${jersey.bom.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
@@ -141,12 +159,6 @@
<version>2.7.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.nifi.registry</groupId>
- <artifactId>nifi-registry-security-utils</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-cert</artifactId>
@@ -282,10 +294,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties-loader</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.nifi.registry</groupId>
- <artifactId>nifi-registry-security-utils</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-cert</artifactId>
diff --git a/nifi-framework-bundle/nifi-framework-nar/pom.xml
b/nifi-framework-bundle/nifi-framework-nar/pom.xml
index d5ef9e5f51..45f6daf4bb 100644
--- a/nifi-framework-bundle/nifi-framework-nar/pom.xml
+++ b/nifi-framework-bundle/nifi-framework-nar/pom.xml
@@ -35,6 +35,21 @@
<artifactId>jersey-server</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-json-jackson</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
@@ -120,11 +135,6 @@
<artifactId>nifi-properties-loader</artifactId>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>org.apache.nifi.registry</groupId>
- <artifactId>nifi-registry-security-utils</artifactId>
- <scope>compile</scope>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-cert</artifactId>
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index f4e9e1bce5..3415fa8033 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -199,6 +199,11 @@
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-common</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/pom.xml
index 5115cdbc3e..9fa2a7ea01 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/pom.xml
@@ -36,6 +36,11 @@
<artifactId>nifi-utils</artifactId>
<version>2.7.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-proxied-entity</artifactId>
+ <version>2.7.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>
@@ -49,7 +54,6 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -122,11 +126,6 @@
<artifactId>nifi-xml-processing</artifactId>
<version>2.7.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.nifi.registry</groupId>
- <artifactId>nifi-registry-client</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi.registry</groupId>
<artifactId>nifi-registry-flow-diff</artifactId>
@@ -141,14 +140,15 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-framework-api</artifactId>
</dependency>
- <dependency>
- <groupId>jakarta.ws.rs</groupId>
- <artifactId>jakarta.ws.rs-api</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver3</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/extension/NiFiRegistryExtensionRegistry.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/extension/NiFiRegistryExtensionRegistry.java
index 681bbf69b8..6816cde238 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/extension/NiFiRegistryExtensionRegistry.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/extension/NiFiRegistryExtensionRegistry.java
@@ -16,21 +16,22 @@
*/
package org.apache.nifi.registry.extension;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.registry.client.BundleVersionClient;
-import org.apache.nifi.registry.client.NiFiRegistryClient;
-import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
-import org.apache.nifi.registry.client.NiFiRegistryException;
-import org.apache.nifi.registry.client.RequestConfig;
-import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
-import org.apache.nifi.registry.client.impl.request.ProxiedEntityRequestConfig;
-import org.apache.nifi.registry.extension.bundle.BundleVersionFilterParams;
import org.apache.nifi.registry.extension.bundle.BundleVersionMetadata;
+import org.apache.nifi.security.proxied.entity.StandardProxiedEntityEncoder;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
-import java.util.List;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
@@ -39,33 +40,39 @@ import java.util.stream.Collectors;
*/
public class NiFiRegistryExtensionRegistry extends
AbstractExtensionRegistry<NiFiRegistryExtensionBundleMetadata> {
- private NiFiRegistryClient registryClient;
+ private static final Duration TIMEOUT = Duration.ofSeconds(30);
+
+ private static final String FORWARD_SLASH = "/";
+
+ private static final String PROXIED_ENTITIES_CHAIN_HEADER =
"X-ProxiedEntitiesChain";
+
+ private static final ObjectMapper objectMapper = new
ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+
+ private HttpClient httpClient;
public NiFiRegistryExtensionRegistry(final String identifier, final String
url, final String name, final SSLContext sslContext) {
super(identifier, url, name, sslContext);
}
- private synchronized NiFiRegistryClient getRegistryClient() {
- if (registryClient != null) {
- return registryClient;
+ private synchronized HttpClient getConfiguredHttpClient() {
+ if (httpClient != null) {
+ return httpClient;
}
- final NiFiRegistryClientConfig config = new
NiFiRegistryClientConfig.Builder()
- .connectTimeout(30000)
- .readTimeout(30000)
- .sslContext(getSSLContext())
- .baseUrl(getURL())
- .build();
+ final HttpClient.Builder builder = HttpClient.newBuilder();
+ builder.connectTimeout(TIMEOUT);
- registryClient = new JerseyNiFiRegistryClient.Builder()
- .config(config)
- .build();
+ final SSLContext sslContext = getSSLContext();
+ if (sslContext != null) {
+ builder.sslContext(sslContext);
+ }
+ httpClient = builder.build();
- return registryClient;
+ return httpClient;
}
private synchronized void invalidateClient() {
- this.registryClient = null;
+ this.httpClient = null;
}
@Override
@@ -75,41 +82,63 @@ public class NiFiRegistryExtensionRegistry extends
AbstractExtensionRegistry<NiF
}
@Override
- public Set<NiFiRegistryExtensionBundleMetadata>
getExtensionBundleMetadata(final NiFiUser user)
- throws IOException, ExtensionRegistryException {
- final RequestConfig requestConfig = getRequestConfig(user);
- final NiFiRegistryClient registryClient = getRegistryClient();
- final BundleVersionClient bundleVersionClient =
registryClient.getBundleVersionClient(requestConfig);
+ public Set<NiFiRegistryExtensionBundleMetadata>
getExtensionBundleMetadata(final NiFiUser user) throws IOException,
ExtensionRegistryException {
+ final URI versionsUri = getUri("bundles/versions");
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(versionsUri);
- try {
- final List<BundleVersionMetadata> bundleVersions =
bundleVersionClient.getBundleVersions(BundleVersionFilterParams.empty());
- return bundleVersions.stream().map(bv ->
map(bv)).collect(Collectors.toSet());
- } catch (final NiFiRegistryException nre) {
- throw new ExtensionRegistryException(nre.getMessage(), nre);
+ try (InputStream inputStream = sendRequest(requestBuilder, user)) {
+ final BundleVersionMetadata[] bundleVersions =
objectMapper.readValue(inputStream, BundleVersionMetadata[].class);
+ return
Arrays.stream(bundleVersions).map(this::map).collect(Collectors.toSet());
}
}
@Override
- public InputStream getExtensionBundleContent(final NiFiUser user, final
NiFiRegistryExtensionBundleMetadata bundleMetadata)
- throws IOException, ExtensionRegistryException {
- final RequestConfig requestConfig = getRequestConfig(user);
- final NiFiRegistryClient registryClient = getRegistryClient();
- final BundleVersionClient bundleVersionClient =
registryClient.getBundleVersionClient(requestConfig);
+ public InputStream getExtensionBundleContent(final NiFiUser user, final
NiFiRegistryExtensionBundleMetadata bundleMetadata) throws
ExtensionRegistryException {
+ final String bundleId = bundleMetadata.getBundleIdentifier();
+ final String version = bundleMetadata.getVersion();
+ final URI versionsUri =
getUri("bundles/%s/versions/%s/content".formatted(bundleId, version));
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(versionsUri);
- try {
- return
bundleVersionClient.getBundleVersionContent(bundleMetadata.getBundleIdentifier(),
bundleMetadata.getVersion());
- } catch (NiFiRegistryException nre) {
- throw new ExtensionRegistryException(nre.getMessage(), nre);
- }
+ return sendRequest(requestBuilder, user);
}
- private RequestConfig getRequestConfig(final NiFiUser user) {
- final String identity = getIdentity(user);
- return identity == null ? null : new
ProxiedEntityRequestConfig(identity);
+ private InputStream sendRequest(final HttpRequest.Builder requestBuilder,
final NiFiUser user) throws ExtensionRegistryException {
+ final HttpClient configuredHttpClient = getConfiguredHttpClient();
+
+ if (user != null && !user.isAnonymous()) {
+ final String identity = user.getIdentity();
+ final String proxiedEntities =
StandardProxiedEntityEncoder.getInstance().getEncodedEntity(identity);
+ requestBuilder.setHeader(PROXIED_ENTITIES_CHAIN_HEADER,
proxiedEntities);
+ }
+
+ requestBuilder.timeout(TIMEOUT);
+ final HttpRequest request = requestBuilder.build();
+
+ final URI uri = request.uri();
+ try {
+ final HttpResponse<InputStream> response =
configuredHttpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
+ final int statusCode = response.statusCode();
+ if (HttpURLConnection.HTTP_OK == statusCode) {
+ return response.body();
+ } else {
+ throw new ExtensionRegistryException("Registry request failed
with HTTP %d [%s]".formatted(statusCode, uri));
+ }
+ } catch (final IOException e) {
+ throw new ExtensionRegistryException("Registry request failed
[%s]".formatted(uri), e);
+ } catch (final InterruptedException e) {
+ throw new ExtensionRegistryException("Registry requested
interrupted [%s]".formatted(uri), e);
+ }
}
- private String getIdentity(final NiFiUser user) {
- return (user == null || user.isAnonymous()) ? null :
user.getIdentity();
+ private URI getUri(final String path) {
+ final StringBuilder builder = new StringBuilder();
+ final String baseUrl = getURL();
+ builder.append(baseUrl);
+ if (!baseUrl.endsWith(FORWARD_SLASH)) {
+ builder.append(FORWARD_SLASH);
+ }
+ builder.append(path);
+ return URI.create(builder.toString());
}
private NiFiRegistryExtensionBundleMetadata map(final
BundleVersionMetadata bundleVersionMetadata) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 1a744115f5..23ce5ad143 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -50,8 +50,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
-import jakarta.ws.rs.core.Response;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
@@ -92,8 +92,8 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
private static final Logger logger =
LoggerFactory.getLogger(StandardRemoteProcessGroup.class);
// status codes
- private static final int UNAUTHORIZED_STATUS_CODE =
Response.Status.UNAUTHORIZED.getStatusCode();
- private static final int FORBIDDEN_STATUS_CODE =
Response.Status.FORBIDDEN.getStatusCode();
+ private static final int UNAUTHORIZED_STATUS_CODE =
HttpURLConnection.HTTP_UNAUTHORIZED;
+ private static final int FORBIDDEN_STATUS_CODE =
HttpURLConnection.HTTP_FORBIDDEN;
private final String id;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/extension/NiFiRegistryExtensionRegistryTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/extension/NiFiRegistryExtensionRegistryTest.java
new file mode 100644
index 0000000000..e5fa3d5a51
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/extension/NiFiRegistryExtensionRegistryTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import mockwebserver3.RecordedRequest;
+import okhttp3.Headers;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.StandardNiFiUser;
+import org.apache.nifi.registry.extension.bundle.BundleVersionMetadata;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
+class NiFiRegistryExtensionRegistryTest {
+ private static final int REQUEST_TIMEOUT = 2;
+
+ private static final String IDENTIFIER =
NiFiRegistryExtensionRegistryTest.class.getSimpleName();
+
+ private static final String API_PATH = "/nifi-registry-api";
+
+ private static final String VERSIONS_PATH =
"/nifi-registry-api/bundles/versions";
+
+ private static final String IDENTITY = "username";
+
+ private static final NiFiUser USER = new
StandardNiFiUser.Builder().identity(IDENTITY).build();
+
+ private static final String PROXIED_ENTITIES_HEADER =
"X-ProxiedEntitiesChain";
+
+ private static final String EXPECTED_PROXIED_ENTITIES = "<username>";
+
+ private static final String BUNDLE_CONTENT_LOCATION =
"org.apache.nifi::nifi-extension-nar::2.0.0::nifi-extension";
+
+ private static final String CONTENT_PATH =
"/nifi-registry-api/bundles/nifi-extension/versions/2.0.0/content";
+
+ private static final String CONTENT_TYPE_HEADER = "Content-Type";
+
+ private static final String APPLICATION_JSON = "application/json";
+
+ private static final String EMPTY_ARRAY = "[]";
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private MockWebServer mockWebServer;
+
+ private NiFiRegistryExtensionRegistry registry;
+
+ @BeforeEach
+ void startServer() throws IOException {
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+
+ final String url = mockWebServer.url(API_PATH).toString();
+
+ registry = new NiFiRegistryExtensionRegistry(IDENTIFIER, url,
IDENTIFIER, null);
+ }
+
+ @AfterEach
+ void shutdownServer() {
+ mockWebServer.close();
+ }
+
+ @Test
+ void testGetExtensionBundleMetadata() throws Exception {
+ final MockResponse response = new MockResponse.Builder()
+ .code(HttpURLConnection.HTTP_OK)
+ .body(EMPTY_ARRAY)
+ .build();
+ mockWebServer.enqueue(response);
+
+ final Set<NiFiRegistryExtensionBundleMetadata> metadata =
registry.getExtensionBundleMetadata(null);
+
+ assertNotNull(metadata);
+ assertRequestRecorded(VERSIONS_PATH);
+ }
+
+ @Test
+ void testGetExtensionBundleMetadataProxyUser() throws Exception {
+ final NiFiRegistryExtensionBundleMetadata metadataExpected =
NiFiRegistryExtensionBundleMetadata.fromLocationString(BUNDLE_CONTENT_LOCATION)
+ .registryIdentifier(IDENTIFIER)
+ .build();
+
+ final BundleVersionMetadata versionMetadata = new
BundleVersionMetadata();
+ versionMetadata.setGroupId(metadataExpected.getGroup());
+ versionMetadata.setVersion(metadataExpected.getVersion());
+ versionMetadata.setArtifactId(metadataExpected.getArtifact());
+ versionMetadata.setBundleId(metadataExpected.getBundleIdentifier());
+ final String bundleMetadataBody =
objectMapper.writeValueAsString(List.of(versionMetadata));
+
+ final MockResponse response = new MockResponse.Builder()
+ .code(HttpURLConnection.HTTP_OK)
+ .setHeader(CONTENT_TYPE_HEADER, APPLICATION_JSON)
+ .body(bundleMetadataBody)
+ .build();
+ mockWebServer.enqueue(response);
+
+ final Set<NiFiRegistryExtensionBundleMetadata> metadataFound =
registry.getExtensionBundleMetadata(USER);
+
+ assertNotNull(metadataFound);
+ final NiFiRegistryExtensionBundleMetadata bundleMetadataFound =
metadataFound.iterator().next();
+ assertEquals(metadataExpected, bundleMetadataFound);
+
+ final RecordedRequest request = assertRequestRecorded(VERSIONS_PATH);
+ assertProxiedEntitiesMatched(request);
+ }
+
+ @Test
+ void testGetExtensionBundleContent() throws Exception {
+ final MockResponse response = new MockResponse.Builder()
+ .code(HttpURLConnection.HTTP_OK)
+ .build();
+ mockWebServer.enqueue(response);
+
+ final NiFiRegistryExtensionBundleMetadata metadata =
NiFiRegistryExtensionBundleMetadata.fromLocationString(BUNDLE_CONTENT_LOCATION)
+ .registryIdentifier(IDENTIFIER)
+ .build();
+ final InputStream bundleContent =
registry.getExtensionBundleContent(USER, metadata);
+
+ assertNotNull(bundleContent);
+ final RecordedRequest request = assertRequestRecorded(CONTENT_PATH);
+ assertProxiedEntitiesMatched(request);
+ }
+
+ private void assertProxiedEntitiesMatched(final RecordedRequest request) {
+ final Headers headers = request.getHeaders();
+ final String proxiedEntities = headers.get(PROXIED_ENTITIES_HEADER);
+ assertEquals(EXPECTED_PROXIED_ENTITIES, proxiedEntities);
+ }
+
+ private RecordedRequest assertRequestRecorded(final String
encodedPathExpected) throws InterruptedException {
+ final RecordedRequest request =
mockWebServer.takeRequest(REQUEST_TIMEOUT, TimeUnit.SECONDS);
+ assertNotNull(request);
+ final String encodedPath = request.getUrl().encodedPath();
+ assertEquals(encodedPathExpected, encodedPath);
+ return request;
+ }
+}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
index 49571fcf25..f745c078de 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
@@ -241,12 +241,6 @@
<version>2.7.0-SNAPSHOT</version>
<scope>provided</scope> <!-- expected to be provided by parent
classloader -->
</dependency>
- <dependency>
- <groupId>org.apache.nifi.registry</groupId>
- <artifactId>nifi-registry-client</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- <scope>provided</scope> <!-- expected to be provided by parent
classloader -->
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core-api</artifactId>
@@ -281,6 +275,11 @@
<artifactId>jersey-common</artifactId>
<scope>provided</scope> <!-- expected to be provided by parent
classloader -->
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ <scope>provided</scope> <!-- expected to be provided by parent
classloader -->
+ </dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index e80ad7f118..71994f9289 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -73,7 +73,6 @@ import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.NarClassLoadersHolder;
-import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowVersionLocation;
import org.apache.nifi.ui.extension.contentviewer.ContentViewer;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
@@ -2439,7 +2438,7 @@ public class FlowResource extends ApplicationResource {
description = "The registry id.",
required = true
)
- @PathParam("id") String id) throws NiFiRegistryException {
+ @PathParam("id") String id) {
authorizeFlow();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 308b9520f2..51bfc22068 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -68,7 +68,6 @@ import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.groups.VersionedComponentAdditions;
import org.apache.nifi.parameter.ParameterContext;
-import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryBucket;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
@@ -435,7 +434,7 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
)
public Response getLocalModifications(
@Parameter(description = "The process group id.")
- @PathParam("id") final String groupId) throws IOException,
NiFiRegistryException {
+ @PathParam("id") final String groupId) throws IOException {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/pom.xml
index ea205eea13..9de903543b 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/pom.xml
@@ -59,11 +59,6 @@
<artifactId>nifi-deprecation-log</artifactId>
<version>2.7.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.nifi.registry</groupId>
- <artifactId>nifi-registry-security-utils</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils-api</artifactId>
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
index 9e5cc746bd..be182e2e21 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
@@ -52,11 +52,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.nifi.registry</groupId>
- <artifactId>nifi-registry-client</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-data-provenance-utils</artifactId>
@@ -137,6 +132,11 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver3</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
index 425715ebd4..137ba9f203 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
@@ -22,10 +22,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.registry.VersionedFlowConverter;
-import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.stateless.core.RegistryUtil;
+import org.apache.nifi.stateless.core.FlowSnapshotProvider;
+import org.apache.nifi.stateless.core.RegistryFlowSnapshotProvider;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
@@ -540,7 +540,7 @@ public class PropertiesFileFlowDefinitionParser implements
DataflowDefinitionPar
try {
final SSLContext sslContext =
SslConfigurationUtil.createSslContext(sslContextDefinition);
return fetchFlowFromRegistry(registryUrl, bucketId, flowId,
flowVersion, sslContext);
- } catch (final NiFiRegistryException e) {
+ } catch (final IOException e) {
throw new StatelessConfigurationException("Could not fetch flow
from Registry", e);
}
}
@@ -597,13 +597,13 @@ public class PropertiesFileFlowDefinitionParser
implements DataflowDefinitionPar
}
private VersionedFlowSnapshot fetchFlowFromRegistry(final String
registryUrl, final String bucketId, final String flowId, final Integer
flowVersion,
- final SSLContext
sslContext) throws IOException, NiFiRegistryException {
+ final SSLContext
sslContext) throws IOException {
logger.info("Fetching flow from NiFi Registry at {}", registryUrl);
final long start = System.currentTimeMillis();
- final RegistryUtil registryUtil = new RegistryUtil(registryUrl,
sslContext);
- final VersionedFlowSnapshot snapshot =
registryUtil.getFlowByID(bucketId, flowId, flowVersion == null ? -1 :
flowVersion);
+ final FlowSnapshotProvider flowSnapshotProvider = new
RegistryFlowSnapshotProvider(registryUrl, sslContext);
+ final VersionedFlowSnapshot snapshot =
flowSnapshotProvider.getFlowSnapshot(bucketId, flowId, flowVersion == null ? -1
: flowVersion);
final long millis = System.currentTimeMillis() - start;
logger.info("Successfully fetched flow from NiFi Registry in {}
millis", millis);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/FlowSnapshotProvider.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/FlowSnapshotProvider.java
new file mode 100644
index 0000000000..863c72d995
--- /dev/null
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/FlowSnapshotProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.core;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+
+import java.io.IOException;
+
+/**
+ * Abstraction for retrieving Versioned Flow Snapshot using provided
coordinates
+ */
+public interface FlowSnapshotProvider {
+ /**
+ * Get Versioned Flow Snapshot with specified version
+ *
+ * @param bucketId Bucket Identifier
+ * @param flowId Flow Identifier
+ * @param version Version number or -1 indicating latest version
+ * @return Versioned Flow Snapshot or null when not found
+ * @throws IOException Thrown on failure to retrieving Versioned Flow
Snapshot
+ */
+ VersionedFlowSnapshot getFlowSnapshot(String bucketId, String flowId, int
version) throws IOException;
+}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryFlowSnapshotProvider.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryFlowSnapshotProvider.java
new file mode 100644
index 0000000000..14aee4b341
--- /dev/null
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryFlowSnapshotProvider.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.core;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class RegistryFlowSnapshotProvider implements FlowSnapshotProvider {
+ private static final Logger logger =
LoggerFactory.getLogger(RegistryFlowSnapshotProvider.class);
+
+ private static final Pattern REGISTRY_URL_PATTERN =
Pattern.compile("^(https?://.+?)/?nifi-registry-api.*$");
+
+ private static final Duration TIMEOUT = Duration.ofSeconds(30);
+
+ private static final String FORWARD_SLASH = "/";
+
+ private static final String BUCKET_FLOW_PATH_FORMAT =
"buckets/%s/flows/%s";
+
+ private static final String BUCKET_FLOW_VERSION_PATH_FORMAT =
"buckets/%s/flows/%s/versions/%d";
+
+ private static final ObjectMapper objectMapper = new
ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+
+ private final String registryUrl;
+ private final HttpClient httpClient;
+ private final SSLContext sslContext;
+
+ public RegistryFlowSnapshotProvider(final String registryUrl, final
SSLContext sslContext) {
+ this.registryUrl = registryUrl;
+ this.sslContext = sslContext;
+
+ final HttpClient.Builder builder = HttpClient.newBuilder();
+ builder.connectTimeout(TIMEOUT);
+
+ if (sslContext != null) {
+ builder.sslContext(sslContext);
+ }
+ httpClient = builder.build();
+ }
+
+ @Override
+ public VersionedFlowSnapshot getFlowSnapshot(final String bucketID, final
String flowID, final int versionRequested) throws IOException {
+ final int version;
+ if (versionRequested == -1) {
+ version = getLatestVersion(bucketID, flowID);
+ } else {
+ version = versionRequested;
+ }
+
+ logger.debug("Fetching flow Bucket={}, Flow={}, Version={},
FetchRemoteFlows=true", bucketID, flowID, version);
+ final long start = System.nanoTime();
+ final VersionedFlowSnapshot snapshot = getFlowContents(bucketID,
flowID, version);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
start);
+ logger.info("Fetched Flow [{}] Version [{}] from Bucket [{}] in {}
ms", flowID, version, bucketID, millis);
+
+ return snapshot;
+ }
+
+ private int getLatestVersion(final String bucketId, final String flowId)
throws IOException {
+ final String path = BUCKET_FLOW_PATH_FORMAT.formatted(bucketId,
flowId);
+ final URI uri = getUri(path);
+ final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(uri);
+
+ try {
+ final byte[] responseBody = sendRequest(requestBuilder);
+ final VersionedFlow versionedFlow =
objectMapper.readValue(responseBody, VersionedFlow.class);
+ final long versionCount = versionedFlow.getVersionCount();
+ return Math.toIntExact(versionCount);
+ } catch (final Exception e) {
+ throw new IOException("Failed to get Latest Version for Bucket
[%s] Flow [%s]".formatted(bucketId, flowId));
+ }
+ }
+
+ private VersionedFlowSnapshot getFlowContents(final String bucketId, final
String flowId, final int version) throws IOException {
+ final String path =
BUCKET_FLOW_VERSION_PATH_FORMAT.formatted(bucketId, flowId, version);
+ final URI uri = getUri(path);
+ final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(uri);
+
+ try {
+ final byte[] responseBody = sendRequest(requestBuilder);
+ final VersionedFlowSnapshot flowSnapshot =
objectMapper.readValue(responseBody, VersionedFlowSnapshot.class);
+
+ final VersionedProcessGroup contents =
flowSnapshot.getFlowContents();
+ for (final VersionedProcessGroup child :
contents.getProcessGroups()) {
+ populateVersionedContentsRecursively(child);
+ }
+
+ return flowSnapshot;
+ } catch (final Exception e) {
+ throw new IOException("Failed to get contents for Flow [%s]
Version [%d] Bucket [%s]".formatted(flowId, version, bucketId), e);
+ }
+ }
+
+ protected String getBaseRegistryUrl(final String storageLocation) {
+ final Matcher matcher = REGISTRY_URL_PATTERN.matcher(storageLocation);
+ if (matcher.matches()) {
+ return matcher.group(1);
+ } else {
+ return storageLocation;
+ }
+ }
+
+ private void populateVersionedContentsRecursively(final
VersionedProcessGroup group) throws IOException {
+ if (group == null) {
+ return;
+ }
+
+ final VersionedFlowCoordinates coordinates =
group.getVersionedFlowCoordinates();
+ if (coordinates != null) {
+ final String subRegistryUrl =
getBaseRegistryUrl(coordinates.getStorageLocation());
+ final String bucketId = coordinates.getBucketId();
+ final String flowId = coordinates.getFlowId();
+ final int version = Integer.parseInt(coordinates.getVersion());
+
+ final FlowSnapshotProvider nestedProvider = new
RegistryFlowSnapshotProvider(subRegistryUrl, sslContext);
+ final VersionedFlowSnapshot snapshot =
nestedProvider.getFlowSnapshot(bucketId, flowId, version);
+ final VersionedProcessGroup contents = snapshot.getFlowContents();
+
+ group.setComments(contents.getComments());
+ group.setConnections(contents.getConnections());
+ group.setControllerServices(contents.getControllerServices());
+ group.setFunnels(contents.getFunnels());
+ group.setInputPorts(contents.getInputPorts());
+ group.setLabels(contents.getLabels());
+ group.setOutputPorts(contents.getOutputPorts());
+ group.setProcessGroups(contents.getProcessGroups());
+ group.setProcessors(contents.getProcessors());
+ group.setRemoteProcessGroups(contents.getRemoteProcessGroups());
+ group.setFlowFileConcurrency(contents.getFlowFileConcurrency());
+
group.setFlowFileOutboundPolicy(contents.getFlowFileOutboundPolicy());
+
group.setDefaultFlowFileExpiration(contents.getDefaultFlowFileExpiration());
+
group.setDefaultBackPressureObjectThreshold(contents.getDefaultBackPressureObjectThreshold());
+
group.setDefaultBackPressureDataSizeThreshold(contents.getDefaultBackPressureDataSizeThreshold());
+ group.setLogFileSuffix(contents.getLogFileSuffix());
+ coordinates.setLatest(snapshot.isLatest());
+ }
+
+ for (final VersionedProcessGroup child : group.getProcessGroups()) {
+ populateVersionedContentsRecursively(child);
+ }
+ }
+
+ private byte[] sendRequest(final HttpRequest.Builder requestBuilder)
throws IOException {
+ requestBuilder.timeout(TIMEOUT);
+ final HttpRequest request = requestBuilder.build();
+
+ final URI uri = request.uri();
+ try {
+ final HttpResponse<byte[]> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofByteArray());
+ final int statusCode = response.statusCode();
+ if (HttpURLConnection.HTTP_OK == statusCode) {
+ return response.body();
+ } else {
+ throw new IOException("Registry request failed with HTTP %d
[%s]".formatted(statusCode, uri));
+ }
+ } catch (final IOException e) {
+ throw new IOException("Registry request failed
[%s]".formatted(uri), e);
+ } catch (final InterruptedException e) {
+ throw new IOException("Registry requested interrupted
[%s]".formatted(uri), e);
+ }
+ }
+
+ private URI getUri(final String path) {
+ final StringBuilder builder = new StringBuilder();
+ builder.append(registryUrl);
+ if (!registryUrl.endsWith(FORWARD_SLASH)) {
+ builder.append(FORWARD_SLASH);
+ }
+ builder.append(path);
+ return URI.create(builder.toString());
+ }
+}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
deleted file mode 100644
index dce02af651..0000000000
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/core/RegistryUtil.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stateless.core;
-
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.flow.VersionedFlowCoordinates;
-import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.client.FlowClient;
-import org.apache.nifi.registry.client.FlowSnapshotClient;
-import org.apache.nifi.registry.client.NiFiRegistryClient;
-import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
-import org.apache.nifi.registry.client.NiFiRegistryException;
-import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
-import org.apache.nifi.registry.client.impl.request.ProxiedEntityRequestConfig;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class RegistryUtil {
- private static final Logger logger =
LoggerFactory.getLogger(RegistryUtil.class);
-
- private static final Pattern REGISTRY_URL_PATTERN =
Pattern.compile("^(https?://.+?)/?nifi-registry-api.*$");
-
- private final String registryUrl;
- private NiFiRegistryClient registryClient;
- private final SSLContext sslContext;
-
- public RegistryUtil(final String registryUrl, final SSLContext sslContext)
{
- this.registryUrl = registryUrl;
- this.sslContext = sslContext;
- }
-
- public RegistryUtil(final NiFiRegistryClient registryClient, final String
registryUrl, final SSLContext sslContext) {
- this.registryClient = registryClient;
- this.registryUrl = registryUrl;
- this.sslContext = sslContext;
- }
-
- public VersionedFlowSnapshot getFlowByID(String bucketID, String flowID,
int versionID) throws IOException, NiFiRegistryException {
- if (versionID == -1) {
- // TODO: Have to support providing some sort of user
- versionID = getLatestVersion(bucketID, flowID, null);
- }
-
- logger.debug("Fetching flow Bucket={}, Flow={}, Version={},
FetchRemoteFlows=true", bucketID, flowID, versionID);
- final long start = System.nanoTime();
- final VersionedFlowSnapshot snapshot = getFlowContents(bucketID,
flowID, versionID, true, null);
- final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
start);
- logger.info("Successfully fetched flow from registry in {} millis",
millis);
-
- return snapshot;
- }
-
- private int getLatestVersion(final String bucketId, final String flowId,
final NiFiUser user) throws IOException, NiFiRegistryException {
- return (int) getFlowClient(user).get(bucketId,
flowId).getVersionCount();
- }
-
- private FlowClient getFlowClient(final NiFiUser user) {
- final String identity = getIdentity(user);
- final NiFiRegistryClient registryClient = getRegistryClient();
- final FlowClient flowClient = identity == null ?
registryClient.getFlowClient() : registryClient.getFlowClient(new
ProxiedEntityRequestConfig(identity));
- return flowClient;
- }
-
- private FlowSnapshotClient getFlowSnapshotClient(final NiFiUser user) {
- final String identity = getIdentity(user);
- final NiFiRegistryClient registryClient = getRegistryClient();
- final FlowSnapshotClient snapshotClient = identity == null ?
registryClient.getFlowSnapshotClient() :
registryClient.getFlowSnapshotClient(new ProxiedEntityRequestConfig(identity));
- return snapshotClient;
- }
-
- private synchronized NiFiRegistryClient getRegistryClient() {
- if (registryClient != null) {
- return registryClient;
- }
-
- final NiFiRegistryClientConfig config = new
NiFiRegistryClientConfig.Builder()
- .connectTimeout(30000)
- .readTimeout(30000)
- .sslContext(sslContext)
- .baseUrl(registryUrl)
- .build();
-
- registryClient = new JerseyNiFiRegistryClient.Builder()
- .config(config)
- .build();
-
- return registryClient;
- }
-
- private String getIdentity(final NiFiUser user) {
- return (user == null || user.isAnonymous()) ? null :
user.getIdentity();
- }
-
- public VersionedFlowSnapshot getFlowContents(final String bucketId, final
String flowId, final int version, final boolean fetchRemoteFlows, final
NiFiUser user)
- throws IOException, NiFiRegistryException {
-
- final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
- final VersionedFlowSnapshot flowSnapshot =
snapshotClient.get(bucketId, flowId, version);
-
- if (fetchRemoteFlows) {
- final VersionedProcessGroup contents =
flowSnapshot.getFlowContents();
- for (final VersionedProcessGroup child :
contents.getProcessGroups()) {
- populateVersionedContentsRecursively(child);
- }
- }
-
- return flowSnapshot;
- }
-
- protected String getBaseRegistryUrl(final String storageLocation) {
- final Matcher matcher = REGISTRY_URL_PATTERN.matcher(storageLocation);
- if (matcher.matches()) {
- return matcher.group(1);
- } else {
- return storageLocation;
- }
- }
-
- private void populateVersionedContentsRecursively(final
VersionedProcessGroup group) throws NiFiRegistryException, IOException {
- if (group == null) {
- return;
- }
-
- final VersionedFlowCoordinates coordinates =
group.getVersionedFlowCoordinates();
- if (coordinates != null) {
- final String subRegistryUrl =
getBaseRegistryUrl(coordinates.getStorageLocation());
- final String bucketId = coordinates.getBucketId();
- final String flowId = coordinates.getFlowId();
- final int version = Integer.parseInt(coordinates.getVersion());
-
- final RegistryUtil subFlowUtil =
getSubRegistryUtil(subRegistryUrl);
- final VersionedFlowSnapshot snapshot =
subFlowUtil.getFlowByID(bucketId, flowId, version);
- final VersionedProcessGroup contents = snapshot.getFlowContents();
-
- group.setComments(contents.getComments());
- group.setConnections(contents.getConnections());
- group.setControllerServices(contents.getControllerServices());
- group.setFunnels(contents.getFunnels());
- group.setInputPorts(contents.getInputPorts());
- group.setLabels(contents.getLabels());
- group.setOutputPorts(contents.getOutputPorts());
- group.setProcessGroups(contents.getProcessGroups());
- group.setProcessors(contents.getProcessors());
- group.setRemoteProcessGroups(contents.getRemoteProcessGroups());
- group.setFlowFileConcurrency(contents.getFlowFileConcurrency());
-
group.setFlowFileOutboundPolicy(contents.getFlowFileOutboundPolicy());
-
group.setDefaultFlowFileExpiration(contents.getDefaultFlowFileExpiration());
-
group.setDefaultBackPressureObjectThreshold(contents.getDefaultBackPressureObjectThreshold());
-
group.setDefaultBackPressureDataSizeThreshold(contents.getDefaultBackPressureDataSizeThreshold());
- group.setLogFileSuffix(contents.getLogFileSuffix());
- coordinates.setLatest(snapshot.isLatest());
- }
-
- for (final VersionedProcessGroup child : group.getProcessGroups()) {
- populateVersionedContentsRecursively(child);
- }
- }
-
- private RegistryUtil getSubRegistryUtil(final String subRegistryUrl) {
- final RegistryUtil subRegistryUtil;
- if (registryUrl.startsWith(subRegistryUrl)) {
- // Share current Registry Client for matching Registry URL
- subRegistryUtil = new RegistryUtil(registryClient, subRegistryUrl,
sslContext);
- } else {
- subRegistryUtil = new RegistryUtil(subRegistryUrl, sslContext);
- }
- return subRegistryUtil;
- }
-}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/RegistryFlowSnapshotProviderTest.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/RegistryFlowSnapshotProviderTest.java
new file mode 100644
index 0000000000..65e44f8b93
--- /dev/null
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/RegistryFlowSnapshotProviderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.core;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import mockwebserver3.RecordedRequest;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
+class RegistryFlowSnapshotProviderTest {
+ private static final int REQUEST_TIMEOUT = 2;
+
+ private static final String API_PATH = "/nifi-registry-api";
+
+ private static final String BUCKET_ID = "testing";
+
+ private static final String FLOW_ID = "primary";
+
+ private static final int VERSION = 100;
+
+ private static final int LATEST_VERSION = -1;
+
+ private static final String FLOW_PATH =
"/nifi-registry-api/buckets/testing/flows/primary";
+
+ private static final String FLOW_VERSION_PATH =
"/nifi-registry-api/buckets/testing/flows/primary/versions/100";
+
+ private static final String CONTENT_TYPE_HEADER = "Content-Type";
+
+ private static final String APPLICATION_JSON = "application/json";
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private MockWebServer mockWebServer;
+
+ private RegistryFlowSnapshotProvider provider;
+
+ @BeforeEach
+ void startServer() throws IOException {
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+
+ final String url = mockWebServer.url(API_PATH).toString();
+
+ provider = new RegistryFlowSnapshotProvider(url, null);
+ }
+
+ @AfterEach
+ void shutdownServer() {
+ mockWebServer.close();
+ }
+
+ @Test
+ void testGetFlowSnapshot() throws Exception {
+ enqueueSnapshotResponse();
+
+ final VersionedFlowSnapshot snapshot =
provider.getFlowSnapshot(BUCKET_ID, FLOW_ID, VERSION);
+
+ assertNotNull(snapshot);
+ assertFlowVersionRequestRecorded();
+ }
+
+ @Test
+ void testGetFlowSnapshotLatestVersion() throws Exception {
+ enqueueFlowResponse();
+ enqueueSnapshotResponse();
+
+ final VersionedFlowSnapshot snapshot =
provider.getFlowSnapshot(BUCKET_ID, FLOW_ID, LATEST_VERSION);
+
+ assertNotNull(snapshot);
+ assertFlowRequestRecorded();
+ assertFlowVersionRequestRecorded();
+ }
+
+ private void enqueueFlowResponse() throws JsonProcessingException {
+ final VersionedFlow versionedFlow = new VersionedFlow();
+ versionedFlow.setVersionCount(VERSION);
+
+ final String responseBody =
objectMapper.writeValueAsString(versionedFlow);
+
+ final MockResponse response = new MockResponse.Builder()
+ .code(HttpURLConnection.HTTP_OK)
+ .setHeader(CONTENT_TYPE_HEADER, APPLICATION_JSON)
+ .body(responseBody)
+ .build();
+ mockWebServer.enqueue(response);
+ }
+
+ private void enqueueSnapshotResponse() throws JsonProcessingException {
+ final VersionedFlowSnapshot snapshotExpected = new
VersionedFlowSnapshot();
+ final VersionedProcessGroup flowContents = new VersionedProcessGroup();
+ snapshotExpected.setFlowContents(flowContents);
+
+ final String responseBody =
objectMapper.writeValueAsString(snapshotExpected);
+
+ final MockResponse response = new MockResponse.Builder()
+ .code(HttpURLConnection.HTTP_OK)
+ .setHeader(CONTENT_TYPE_HEADER, APPLICATION_JSON)
+ .body(responseBody)
+ .build();
+ mockWebServer.enqueue(response);
+ }
+
+ private void assertFlowRequestRecorded() throws InterruptedException {
+ final RecordedRequest request =
mockWebServer.takeRequest(REQUEST_TIMEOUT, TimeUnit.SECONDS);
+ assertNotNull(request);
+ final String encodedPath = request.getUrl().encodedPath();
+ assertEquals(FLOW_PATH, encodedPath);
+ }
+
+ private void assertFlowVersionRequestRecorded() throws
InterruptedException {
+ final RecordedRequest request =
mockWebServer.takeRequest(REQUEST_TIMEOUT, TimeUnit.SECONDS);
+ assertNotNull(request);
+ final String encodedPath = request.getUrl().encodedPath();
+ assertEquals(FLOW_VERSION_PATH, encodedPath);
+ }
+}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
deleted file mode 100644
index a992c814ad..0000000000
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/core/TestRegistryUtil.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.stateless.core;
-
-import org.apache.nifi.flow.VersionedFlowCoordinates;
-import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.registry.client.FlowSnapshotClient;
-import org.apache.nifi.registry.client.NiFiRegistryClient;
-import org.apache.nifi.registry.client.NiFiRegistryException;
-import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
-import java.util.UUID;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestRegistryUtil {
- private static final String BASE_REGISTRY_URL =
"https://localhost:18443/context-path";
-
- private static final String STORAGE_LOCATION_FORMAT =
"%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
-
- private static final String ROOT_BUCKET_ID = UUID.randomUUID().toString();
- private static final String ROOT_FLOW_ID = UUID.randomUUID().toString();
- private static final int ROOT_VERSION = 1;
-
- private static final String CHILD_BUCKET_ID = UUID.randomUUID().toString();
- private static final String CHILD_FLOW_ID = UUID.randomUUID().toString();
- private static final int CHILD_VERSION = 2;
-
- @Test
- public void testGetBaseRegistryUrl() throws NiFiRegistryException,
IOException {
- final NiFiRegistryClient registryClient =
mock(NiFiRegistryClient.class);
- final RegistryUtil registryUtil = new RegistryUtil(registryClient,
BASE_REGISTRY_URL, null);
-
- final FlowSnapshotClient flowSnapshotClient =
mock(FlowSnapshotClient.class);
-
when(registryClient.getFlowSnapshotClient()).thenReturn(flowSnapshotClient);
-
- final VersionedProcessGroup childVersionedProcessGroup =
getChildVersionedProcessGroup();
- final VersionedFlowSnapshot rootSnapshot =
buildRootSnapshot(Collections.singleton(childVersionedProcessGroup));
- final String rootRegistryUrl =
registryUtil.getBaseRegistryUrl(rootSnapshot.getFlowContents().getVersionedFlowCoordinates().getStorageLocation());
- assertEquals(BASE_REGISTRY_URL, rootRegistryUrl);
-
- final VersionedFlowSnapshot childSnapshot = new
VersionedFlowSnapshot();
- childSnapshot.setFlowContents(childVersionedProcessGroup);
- final String childRegistryUrl =
registryUtil.getBaseRegistryUrl(childVersionedProcessGroup.getVersionedFlowCoordinates().getStorageLocation());
- assertEquals(BASE_REGISTRY_URL, childRegistryUrl);
-
- when(flowSnapshotClient.get(eq(ROOT_BUCKET_ID), eq(ROOT_FLOW_ID),
eq(ROOT_VERSION))).thenReturn(rootSnapshot);
- when(flowSnapshotClient.get(eq(CHILD_BUCKET_ID), eq(CHILD_FLOW_ID),
eq(CHILD_VERSION))).thenReturn(childSnapshot);
-
- final VersionedFlowSnapshot flowSnapshot =
registryUtil.getFlowContents(ROOT_BUCKET_ID, ROOT_FLOW_ID, ROOT_VERSION, true,
null);
- assertEquals(rootSnapshot, flowSnapshot);
- }
-
- private VersionedFlowSnapshot buildRootSnapshot(final
Set<VersionedProcessGroup> childGroups) {
- final String storageLocation = String.format(STORAGE_LOCATION_FORMAT,
BASE_REGISTRY_URL, ROOT_BUCKET_ID, ROOT_FLOW_ID, ROOT_VERSION);
- final VersionedFlowCoordinates coordinates = new
VersionedFlowCoordinates();
- coordinates.setStorageLocation(storageLocation);
- coordinates.setBucketId(ROOT_BUCKET_ID);
- coordinates.setFlowId(ROOT_FLOW_ID);
- coordinates.setVersion(String.valueOf(ROOT_VERSION));
-
- final VersionedProcessGroup group = new VersionedProcessGroup();
- group.setVersionedFlowCoordinates(coordinates);
- group.setProcessGroups(childGroups);
-
- final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
- snapshot.setFlowContents(group);
- return snapshot;
- }
-
- private VersionedProcessGroup getChildVersionedProcessGroup() {
- final String storageLocation = String.format(STORAGE_LOCATION_FORMAT,
BASE_REGISTRY_URL, CHILD_BUCKET_ID, CHILD_FLOW_ID, CHILD_VERSION);
- final VersionedFlowCoordinates coordinates = new
VersionedFlowCoordinates();
- coordinates.setStorageLocation(storageLocation);
- coordinates.setBucketId(CHILD_BUCKET_ID);
- coordinates.setFlowId(CHILD_FLOW_ID);
- coordinates.setVersion(String.valueOf(CHILD_VERSION));
-
- final VersionedProcessGroup group = new VersionedProcessGroup();
- group.setVersionedFlowCoordinates(coordinates);
-
- return group;
- }
-}
\ No newline at end of file