This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4db5f05436 NIFI-15214 Refactored site-to-site-client using Java
HttpClient
4db5f05436 is described below
commit 4db5f05436fe75ddc3441757338c2535e4d33214
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Nov 12 13:28:53 2025 -0600
NIFI-15214 Refactored site-to-site-client using Java HttpClient
- Refactored ClusterLoadBalanceAuthorizer wildcard certificate verification
- Removed Apache HttpClient 4 from framework modules
Signed-off-by: Pierre Villard <[email protected]>
This closes #10522.
---
nifi-commons/nifi-site-to-site-client/pom.xml | 53 +-
.../nifi/remote/protocol/http/HttpProxy.java | 12 -
.../apache/nifi/remote/util/ClusterUrlParser.java | 140 +++
.../nifi/remote/util/SiteToSiteRestApiClient.java | 1211 ++++----------------
.../remote/util/SiteToSiteRestApiClientTest.java | 497 ++++++++
.../remote/util/TestSiteToSiteRestApiClient.java | 2 +-
.../apache/nifi/reporting/s2s/SiteToSiteUtils.java | 6 +-
.../nifi/remote/StandardRemoteProcessGroup.java | 60 +-
.../nifi-framework/nifi-framework-core/pom.xml | 4 -
.../server/ClusterLoadBalanceAuthorizer.java | 41 +-
.../server/ClusteredLoadBalanceAuthorizerTest.java | 131 +++
.../nifi/remote/StandardRemoteGroupPort.java | 4 +-
.../nifi-framework/nifi-web/nifi-jetty/pom.xml | 4 -
.../apache/nifi/web/api/ProcessGroupResource.java | 4 +-
14 files changed, 1076 insertions(+), 1093 deletions(-)
diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml
b/nifi-commons/nifi-site-to-site-client/pom.xml
index 8423c64997..c19b3bc8d7 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -24,10 +24,6 @@
<artifactId>nifi-site-to-site-client</artifactId>
- <properties>
-
<httpclient.version>${org.apache.httpcomponents.httpclient.version}</httpclient.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -71,53 +67,22 @@
<version>2.7.0-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore-nio</artifactId>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>jakarta.xml.bind</groupId>
+ <artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpasyncclient</artifactId>
- <version>4.1.5</version>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>jakarta.xml.bind</groupId>
- <artifactId>jakarta.xml.bind-api</artifactId>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver3</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
index 4c0ebe73b4..ebfbbe413c 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
@@ -16,9 +16,6 @@
*/
package org.apache.nifi.remote.protocol.http;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-
public class HttpProxy {
private final String host;
private final Integer port;
@@ -32,7 +29,6 @@ public class HttpProxy {
this.password = password;
}
-
public String getHost() {
return host;
}
@@ -48,12 +44,4 @@ public class HttpProxy {
public String getPassword() {
return password;
}
-
- public HttpHost getHttpHost() {
- if (StringUtils.isEmpty(host)) {
- return null;
- }
- return new HttpHost(host, port == null ? 80 : port);
- }
-
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ClusterUrlParser.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ClusterUrlParser.java
new file mode 100644
index 0000000000..1426765833
--- /dev/null
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ClusterUrlParser.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Site-To-Site Cluster URL Parser
+ */
+public class ClusterUrlParser {
+ /**
+ * Parse the comma-separated URLs string for the remote NiFi instances.
+ *
+ * @return A set containing one or more URLs
+ * @throws IllegalArgumentException when it fails to parse the URLs string,
+ * URLs string contains multiple protocols (http and https mix),
+ * or none of URL is specified.
+ */
+ public static Set<String> parseClusterUrls(final String clusterUrls) {
+ final Set<String> urls = new LinkedHashSet<>();
+ if (clusterUrls != null && !clusterUrls.isEmpty()) {
+ Arrays.stream(clusterUrls.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .forEach(s -> {
+ validateUriString(s);
+ urls.add(resolveBaseUrl(s).intern());
+ });
+ }
+
+ if (urls.isEmpty()) {
+ throw new IllegalArgumentException("Cluster URL was not
specified.");
+ }
+
+ final Predicate<String> isHttps = url ->
url.toLowerCase().startsWith("https:");
+ if (urls.stream().anyMatch(isHttps) &&
urls.stream().anyMatch(isHttps.negate())) {
+ throw new IllegalArgumentException("Different protocols are used
in the cluster URLs " + clusterUrls);
+ }
+
+ return Collections.unmodifiableSet(urls);
+ }
+
+ static String resolveBaseUrl(final String clusterUrl) {
+ Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null.");
+ final URI uri;
+ try {
+ uri = new URI(clusterUrl.trim());
+ } catch (final URISyntaxException e) {
+ throw new IllegalArgumentException("The specified URL is
malformed: " + clusterUrl);
+ }
+
+ return resolveBaseUrl(uri);
+ }
+
+ private static void validateUriString(String s) {
+ // parse the uri
+ final URI uri;
+ try {
+ uri = URI.create(s);
+ } catch (final IllegalArgumentException e) {
+ throw new IllegalArgumentException("The specified remote process
group URL is malformed: " + s);
+ }
+
+ // validate each part of the uri
+ if (uri.getScheme() == null || uri.getHost() == null) {
+ throw new IllegalArgumentException("The specified remote process
group URL is malformed: " + s);
+ }
+
+ if (!(uri.getScheme().equalsIgnoreCase("http") ||
uri.getScheme().equalsIgnoreCase("https"))) {
+ throw new IllegalArgumentException("The specified remote process
group URL is invalid because it is not http or https: " + s);
+ }
+ }
+
+ /**
+ * Resolve NiFi API url with leniency. This method does following
conversion on uri path:
+ * <ul>
+ * <li>/ to /nifi-api</li>
+ * <li>/nifi to /nifi-api</li>
+ * <li>/some/path/ to /some/path/nifi-api</li>
+ * </ul>
+ * @param clusterUrl url to be resolved
+ * @return resolved url
+ */
+ private static String resolveBaseUrl(final URI clusterUrl) {
+
+ if (clusterUrl.getScheme() == null || clusterUrl.getHost() == null) {
+ throw new IllegalArgumentException("The specified URL is
malformed: " + clusterUrl);
+ }
+
+ if (!(clusterUrl.getScheme().equalsIgnoreCase("http") ||
clusterUrl.getScheme().equalsIgnoreCase("https"))) {
+ throw new IllegalArgumentException("The specified URL is invalid
because it is not http or https: " + clusterUrl);
+ }
+
+
+ String uriPath = clusterUrl.getPath().trim();
+
+ if (StringUtils.isEmpty(uriPath) || uriPath.equals("/")) {
+ uriPath = "/nifi";
+ } else if (uriPath.endsWith("/")) {
+ uriPath = uriPath.substring(0, uriPath.length() - 1);
+ }
+
+ final StringBuilder uriPathBuilder = new StringBuilder(uriPath);
+ if (uriPath.endsWith("/nifi")) {
+ uriPathBuilder.append("-api");
+ } else if (!uriPath.endsWith("/nifi-api")) {
+ uriPathBuilder.append("/nifi-api");
+ }
+
+ try {
+ final URI uri = new URI(clusterUrl.getScheme(), null,
clusterUrl.getHost(), clusterUrl.getPort(), uriPathBuilder.toString(), null,
null);
+ return uri.toString();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 523f22d8c3..10e6fb1a3b 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -27,79 +27,40 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
+import java.net.Authenticator;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.PasswordAuthentication;
+import java.net.ProxySelector;
import java.net.URI;
import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
-import java.util.Arrays;
+import java.time.Duration;
import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
+
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.Header;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpException;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpInetConnection;
-import org.apache.http.HttpRequest;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpResponseInterceptor;
-import org.apache.http.StatusLine;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.AuthState;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.client.utils.URIUtils;
-import org.apache.http.conn.ManagedHttpClientConnection;
-import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
-import org.apache.http.nio.ContentEncoder;
-import org.apache.http.nio.IOControl;
-import org.apache.http.nio.conn.ManagedNHttpClientConnection;
-import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
-import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
-import org.apache.http.protocol.HttpContext;
-import org.apache.http.protocol.HttpCoreContext;
-import org.apache.http.util.EntityUtils;
import org.apache.nifi.remote.SiteToSiteEventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection;
@@ -115,7 +76,6 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.remote.protocol.http.HttpProxy;
-import org.apache.nifi.reporting.Severity;
import org.apache.nifi.security.cert.StandardPrincipalFormatter;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.api.dto.ControllerDTO;
@@ -126,6 +86,11 @@ import
org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.net.HttpURLConnection.HTTP_ACCEPTED;
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_OK;
import static
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
@@ -137,31 +102,22 @@ import static
org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTE
public class SiteToSiteRestApiClient implements Closeable {
- private static final String EVENT_CATEGORY = "Site-to-Site";
+ private static final String ACCEPT_HEADER = "Accept";
+ private static final String APPLICATION_JSON = "application/json";
private static final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384;
- private static final int RESPONSE_CODE_OK = 200;
- private static final int RESPONSE_CODE_CREATED = 201;
- private static final int RESPONSE_CODE_ACCEPTED = 202;
- private static final int RESPONSE_CODE_BAD_REQUEST = 400;
- private static final int RESPONSE_CODE_FORBIDDEN = 403;
- private static final int RESPONSE_CODE_NOT_FOUND = 404;
-
private static final Logger logger =
LoggerFactory.getLogger(SiteToSiteRestApiClient.class);
+ private static final ObjectMapper objectMapper = new
ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
private String baseUrl;
protected final SSLContext sslContext;
protected final HttpProxy proxy;
- private final AtomicBoolean proxyAuthRequiresResend = new
AtomicBoolean(false);
private final SiteToSiteEventReporter eventReporter;
- private RequestConfig requestConfig;
- private CredentialsProvider credentialsProvider;
- private CloseableHttpClient httpClient;
- private CloseableHttpAsyncClient httpAsyncClient;
+ private final HttpClient.Builder httpClientBuilder;
+ private HttpClient httpClient;
private boolean compress = false;
- private InetAddress localAddress = null;
private long requestExpirationMillis = 0;
private int serverTransactionTtl = 0;
private int batchCount = 0;
@@ -173,159 +129,63 @@ public class SiteToSiteRestApiClient implements
Closeable {
private final ScheduledExecutorService ttlExtendTaskExecutor;
private ScheduledFuture<?> ttlExtendingFuture;
- private int connectTimeoutMillis;
private int readTimeoutMillis;
private long cacheExpirationMillis = 30000L;
private static final Pattern HTTP_ABS_URL =
Pattern.compile("^https?://.+$");
- private Future<HttpResponse> postResult;
- private CountDownLatch transferDataLatch = new CountDownLatch(1);
+ private CompletableFuture<HttpResponse<String>> transactionFuture;
private static final ConcurrentMap<String, RemoteGroupContents>
contentsMap = new ConcurrentHashMap<>();
- private volatile long lastPruneTimestamp = System.currentTimeMillis();
+ private final long lastPruneTimestamp = System.currentTimeMillis();
public SiteToSiteRestApiClient(final SSLContext sslContext, final
HttpProxy proxy, final SiteToSiteEventReporter eventReporter) {
this.sslContext = sslContext;
this.proxy = proxy;
this.eventReporter = eventReporter;
- ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1, new
ThreadFactory() {
- private final ThreadFactory defaultFactory =
Executors.defaultThreadFactory();
+ final ThreadFactory threadFactory =
Thread.ofVirtual().name(SiteToSiteRestApiClient.class.getSimpleName(),
1).factory();
+ ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1,
threadFactory);
- @Override
- public Thread newThread(final Runnable r) {
- final Thread thread = defaultFactory.newThread(r);
- thread.setName(Thread.currentThread().getName() + "
Site-to-Site Extend Transactions");
- thread.setDaemon(true);
- return thread;
- }
- });
+ httpClientBuilder = HttpClient.newBuilder();
+ if (sslContext != null) {
+ httpClientBuilder.sslContext(sslContext);
+ }
+ if (proxy != null && proxy.getPort() != null) {
+ final InetSocketAddress proxyAddress = new
InetSocketAddress(proxy.getHost(), proxy.getPort());
+ final ProxySelector proxySelector = ProxySelector.of(proxyAddress);
+ httpClientBuilder.proxy(proxySelector);
+ httpClientBuilder.authenticator(getProxyAuthenticator());
+ }
+ httpClient = httpClientBuilder.build();
}
@Override
public void close() throws IOException {
stopExtendingTransaction();
- closeSilently(httpClient);
- closeSilently(httpAsyncClient);
- }
-
- private CloseableHttpClient getHttpClient() {
- if (httpClient == null) {
- setupClient();
- }
- return httpClient;
- }
-
- private CloseableHttpAsyncClient getHttpAsyncClient() {
- if (httpAsyncClient == null) {
- setupAsyncClient();
- }
- return httpAsyncClient;
- }
-
- private RequestConfig getRequestConfig() {
- if (requestConfig == null) {
- setupRequestConfig();
- }
- return requestConfig;
- }
-
- private CredentialsProvider getCredentialsProvider() {
- if (credentialsProvider == null) {
- setupCredentialsProvider();
- }
- return credentialsProvider;
- }
-
- private void setupRequestConfig() {
- final RequestConfig.Builder requestConfigBuilder =
RequestConfig.custom()
- .setConnectionRequestTimeout(connectTimeoutMillis)
- .setConnectTimeout(connectTimeoutMillis)
- .setSocketTimeout(readTimeoutMillis);
-
- if (localAddress != null) {
- requestConfigBuilder.setLocalAddress(localAddress);
- }
-
- if (proxy != null) {
- requestConfigBuilder.setProxy(proxy.getHttpHost());
- }
-
- requestConfig = requestConfigBuilder.build();
- }
-
- private void setupCredentialsProvider() {
- credentialsProvider = new BasicCredentialsProvider();
- if (proxy != null) {
- if (StringUtils.isNotEmpty(proxy.getUsername()) &&
StringUtils.isNotEmpty(proxy.getPassword())) {
- credentialsProvider.setCredentials(
- new AuthScope(proxy.getHttpHost()),
- new UsernamePasswordCredentials(proxy.getUsername(),
proxy.getPassword()));
- }
-
- }
- }
-
- private void setupClient() {
- final HttpClientBuilder clientBuilder = HttpClients.custom();
-
- if (sslContext != null) {
- clientBuilder.setSSLContext(sslContext);
- clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor());
- }
-
- httpClient = clientBuilder
- .setDefaultCredentialsProvider(getCredentialsProvider()).build();
- }
-
- private void setupAsyncClient() {
- final HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
-
- if (sslContext != null) {
- clientBuilder.setSSLContext(sslContext);
- clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor());
- }
-
- httpAsyncClient =
clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider()).build();
- httpAsyncClient.start();
+ httpClient.shutdown();
+ httpClient.close();
}
- private class HttpsResponseInterceptor implements HttpResponseInterceptor {
- @Override
- public void process(final HttpResponse response, final HttpContext
httpContext) throws HttpException, IOException {
- final HttpCoreContext coreContext =
HttpCoreContext.adapt(httpContext);
- final HttpInetConnection conn =
coreContext.getConnection(HttpInetConnection.class);
- if (!conn.isOpen()) {
- return;
- }
+ private Authenticator getProxyAuthenticator() {
+ final String username = proxy.getUsername();
+ final char[] password = proxy.getPassword().toCharArray();
- final SSLSession sslSession;
- if (conn instanceof ManagedHttpClientConnection) {
- sslSession = ((ManagedHttpClientConnection)
conn).getSSLSession();
- } else if (conn instanceof ManagedNHttpClientConnection) {
- sslSession = ((ManagedNHttpClientConnection)
conn).getSSLSession();
- } else {
- throw new RuntimeException("Unexpected connection type was
used, " + conn);
- }
+ return new Authenticator() {
+ private final PasswordAuthentication authentication = new
PasswordAuthentication(username, password);
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication() {
+ final PasswordAuthentication passwordAuthentication;
- if (sslSession != null) {
- final Certificate[] certChain =
sslSession.getPeerCertificates();
- if (certChain == null || certChain.length == 0) {
- throw new SSLPeerUnverifiedException("No certificates
found");
+ if (RequestorType.PROXY == getRequestorType()) {
+ passwordAuthentication = authentication;
+ } else {
+ passwordAuthentication = null;
}
- try {
- final X509Certificate cert = (X509Certificate)
certChain[0];
- trustedPeerDn =
StandardPrincipalFormatter.getInstance().getSubject(cert);
- } catch (final RuntimeException e) {
- final String msg = "Could not extract subject DN from SSL
session peer certificate";
- logger.warn(msg);
- eventReporter.reportEvent(Severity.WARNING,
EVENT_CATEGORY, msg);
- throw new SSLPeerUnverifiedException(msg);
- }
+ return passwordAuthentication;
}
- }
+ };
}
/**
@@ -338,7 +198,7 @@ public class SiteToSiteRestApiClient implements Closeable {
* or none of URL is specified.
*/
public ControllerDTO getController(final String clusterUrls) throws
IOException {
- return getController(parseClusterUrls(clusterUrls));
+ return getController(ClusterUrlParser.parseClusterUrls(clusterUrls));
}
/**
@@ -347,19 +207,15 @@ public class SiteToSiteRestApiClient implements Closeable
{
* After this method execution, the base URL is set with the successful
URL.
*/
public ControllerDTO getController(final Set<String> clusterUrls) throws
IOException {
-
- IOException lastException = null;
+ IOException lastException = new IOException("Get Controller failed");
for (final String clusterUrl : clusterUrls) {
// The url may not be normalized if it passed directly without
parsed with parseClusterUrls.
- setBaseUrl(resolveBaseUrl(clusterUrl));
+ setBaseUrl(ClusterUrlParser.resolveBaseUrl(clusterUrl));
try {
return getController();
} catch (IOException e) {
lastException = e;
logger.warn("Failed to get controller from {}", clusterUrl, e);
- if (logger.isDebugEnabled()) {
- logger.debug("", e);
- }
}
}
@@ -386,7 +242,8 @@ public class SiteToSiteRestApiClient implements Closeable {
final ControllerDTO refreshedContents;
try {
- refreshedContents = fetchController();
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(getUri("/site-to-site"));
+ refreshedContents = send(requestBuilder,
ControllerEntity.class).getController();
} catch (final Exception e) {
// we failed to refresh contents, but we don't want to
constantly poll the remote instance, failing.
// So we put the ControllerDTO back but use a new
RemoteGroupContents so that we get a new timestamp.
@@ -408,20 +265,6 @@ public class SiteToSiteRestApiClient implements Closeable {
}
}
- private ControllerDTO fetchController() throws IOException {
- try {
- final HttpGet get = createGetControllerRequest();
- return execute(get, ControllerEntity.class).getController();
- } catch (final HttpGetFailedException e) {
- if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) {
- logger.debug("getController received NOT_FOUND, trying to
access the old NiFi version resource url...");
- final HttpGet get = createGet("/controller");
- return execute(get, ControllerEntity.class).getController();
- }
- throw e;
- }
- }
-
private void pruneCache() {
for (final Map.Entry<String, RemoteGroupContents> entry :
contentsMap.entrySet()) {
final String url = entry.getKey();
@@ -436,234 +279,61 @@ public class SiteToSiteRestApiClient implements
Closeable {
}
}
- private HttpGet createGetControllerRequest() {
- final HttpGet get = createGet("/site-to-site");
- get.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
- return get;
- }
-
public Collection<PeerDTO> getPeers() throws IOException {
- final HttpGet get = createGet("/site-to-site/peers");
- get.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
- return execute(get, PeersEntity.class).getPeers();
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(getUri("/site-to-site/peers"));
+ return send(requestBuilder, PeersEntity.class).getPeers();
}
public String initiateTransaction(final TransferDirection direction, final
String portId) throws IOException {
-
final String portType = TransferDirection.RECEIVE.equals(direction) ?
"output-ports" : "input-ports";
logger.debug("initiateTransaction handshaking portType={}, portId={}",
portType, portId);
- final HttpPost post = createPost("/data-transfer/" + portType + "/" +
portId + "/transactions");
+ final URI uri = getUri("/data-transfer/" + portType + "/" + portId +
"/transactions");
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(uri).POST(HttpRequest.BodyPublishers.noBody());
+ requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
- post.setHeader("Accept", "application/json");
- post.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-
- setHandshakeProperties(post);
-
- final HttpResponse response;
+ final HttpResponse<InputStream> response;
if (TransferDirection.RECEIVE.equals(direction)) {
- response = initiateTransactionForReceive(post);
+ response = sendRequest(requestBuilder);
} else {
- response = initiateTransactionForSend(post);
+ if (shouldCheckProxyAuth()) {
+ getController();
+ }
+ response = sendRequest(requestBuilder);
}
- final int responseCode = response.getStatusLine().getStatusCode();
+ final int responseCode = response.statusCode();
logger.debug("initiateTransaction responseCode={}", responseCode);
String transactionUrl;
- if (responseCode == RESPONSE_CODE_CREATED) {
- EntityUtils.consume(response.getEntity());
+ if (responseCode == HTTP_CREATED) {
+ response.body().close();
transactionUrl = readTransactionUrl(response);
if (StringUtils.isEmpty(transactionUrl)) {
throw new ProtocolException("Server returned
RESPONSE_CODE_CREATED without Location header");
}
- final Header transportProtocolVersionHeader =
response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
- if (transportProtocolVersionHeader == null) {
- throw new ProtocolException("Server didn't return confirmed
protocol version");
+ final Optional<String> transportProtocolVersionHeader =
response.headers().firstValue(HttpHeaders.PROTOCOL_VERSION);
+ if (transportProtocolVersionHeader.isEmpty()) {
+ throw new ProtocolException("Transport Protocol Response
Header not found");
}
- final int protocolVersionConfirmedByServer =
Integer.parseInt(transportProtocolVersionHeader.getValue());
+ final int protocolVersionConfirmedByServer =
Integer.parseInt(transportProtocolVersionHeader.get());
logger.debug("Finished version negotiation,
protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
- final Header serverTransactionTtlHeader =
response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
- if (serverTransactionTtlHeader == null) {
- throw new ProtocolException("Server didn't return " +
HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
+ final Optional<String> serverTransactionTtlHeader =
response.headers().firstValue(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
+ if (serverTransactionTtlHeader.isEmpty()) {
+ throw new ProtocolException("Transaction TTL Response Header
not found");
}
- serverTransactionTtl =
Integer.parseInt(serverTransactionTtlHeader.getValue());
+ serverTransactionTtl =
Integer.parseInt(serverTransactionTtlHeader.get());
} else {
- try (InputStream content = response.getEntity().getContent()) {
+ try (InputStream content = response.body()) {
throw handleErrResponse(responseCode, content);
}
}
logger.debug("initiateTransaction handshaking finished,
transactionUrl={}", transactionUrl);
return transactionUrl;
-
- }
-
- /**
- * Initiate a transaction for receiving data.
- * @param post a POST request to establish transaction
- * @return POST request response
- * @throws IOException thrown if the post request failed
- */
- private HttpResponse initiateTransactionForReceive(final HttpPost post)
throws IOException {
- return getHttpClient().execute(post);
- }
-
- /**
- * <p>
- * Initiate a transaction for sending data.
- * </p>
- *
- * <p>
- * If a proxy server requires auth, the proxy server returns 407 response
with available auth schema such as basic or digest.
- * Then client has to resend the same request with its credential added.
- * This mechanism is problematic for sending data from NiFi.
- * </p>
- *
- * <p>
- * In order to resend a POST request with auth param,
- * NiFi has to either read flow-file contents to send again, or keep the
POST body somewhere.
- * If we store that in memory, it would causes OOM, or storing it on disk
slows down performance.
- * Rolling back processing session would be overkill.
- * Reading flow-file contents only when it's ready to send in a streaming
way is ideal.
- * </p>
- *
- * <p>
- * Additionally, the way proxy authentication is done is vary among Proxy
server software.
- * Some requires 407 and resend cycle for every requests, while others
keep a connection between a client and
- * the proxy server, then consecutive requests skip auth steps.
- * The problem is, that how should we behave is only told after sending a
request to the proxy.
- * </p>
- *
- * In order to handle above concerns correctly and efficiently, this
method do the followings:
- *
- * <ol>
- * <li>Send a GET request to controller resource, to initiate an
HttpAsyncClient. The instance will be used for further requests.
- * This is not required by the Site-to-Site protocol, but it can
setup proxy auth state safely.</li>
- * <li>Send a POST request to initiate a transaction. While doing so, it
captures how a proxy server works.
- * If 407 and resend cycle occurs here, it implies that we need to do the
same thing again when we actually send the data.
- * Because if the proxy keeps using the same connection and doesn't
require an auth step, it doesn't do so here.</li>
- * <li>Then this method stores whether the final POST request should wait
for the auth step.
- * So that {@link #openConnectionForSend} can determine when to produce
contents.</li>
- * </ol>
- *
- * <p>
- * The above special sequence is only executed when a proxy instance is
set, and its username is set.
- * </p>
- *
- * @param post a POST request to establish transaction
- * @return POST request response
- * @throws IOException thrown if the post request failed
- */
- private HttpResponse initiateTransactionForSend(final HttpPost post)
throws IOException {
- if (shouldCheckProxyAuth()) {
- final CloseableHttpAsyncClient asyncClient = getHttpAsyncClient();
- final HttpGet get = createGetControllerRequest();
- final Future<HttpResponse> getResult = asyncClient.execute(get,
null);
- try {
- final HttpResponse getResponse =
getResult.get(readTimeoutMillis, TimeUnit.MILLISECONDS);
- logger.debug("Proxy auth check has done. getResponse={}",
getResponse.getStatusLine());
- } catch (final ExecutionException e) {
- logger.debug("Something has happened at get controller
requesting thread for proxy auth check. {}", e.getMessage());
- throw toIOException(e);
- } catch (TimeoutException | InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- final HttpAsyncRequestProducer asyncRequestProducer = new
HttpAsyncRequestProducer() {
- private boolean requestHasBeenReset = false;
-
- @Override
- public HttpHost getTarget() {
- return URIUtils.extractHost(post.getURI());
- }
-
- @Override
- public HttpRequest generateRequest() {
- final BasicHttpEntity entity = new BasicHttpEntity();
- post.setEntity(entity);
- return post;
- }
-
- @Override
- public void produceContent(ContentEncoder encoder, IOControl
ioctrl) throws IOException {
- encoder.complete();
- if (shouldCheckProxyAuth() && requestHasBeenReset) {
- logger.debug("Produced content again, assuming the proxy
server requires authentication.");
- proxyAuthRequiresResend.set(true);
- }
- }
-
- @Override
- public void requestCompleted(HttpContext context) {
- debugProxyAuthState(context);
- }
-
- @Override
- public void failed(Exception ex) {
- final String msg = String.format("Failed to create transaction
for %s", post.getURI());
- logger.error(msg, ex);
- eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY,
msg);
- }
-
- @Override
- public boolean isRepeatable() {
- return true;
- }
-
- @Override
- public void resetRequest() {
- requestHasBeenReset = true;
- }
-
- @Override
- public void close() {
- }
- };
-
- final Future<HttpResponse> responseFuture =
getHttpAsyncClient().execute(asyncRequestProducer, new
BasicAsyncResponseConsumer(), null);
- final HttpResponse response;
- try {
- response = responseFuture.get(readTimeoutMillis,
TimeUnit.MILLISECONDS);
-
- } catch (final ExecutionException e) {
- logger.debug("Something has happened at initiate transaction
requesting thread. {}", e.getMessage());
- throw toIOException(e);
- } catch (TimeoutException | InterruptedException e) {
- throw new IOException(e);
- }
- return response;
- }
-
- /**
- * Print AuthState in HttpContext for debugging purpose.
- * <p>
- * If the proxy server requires 407 and resend cycle, this method logs as
followings, for Basic Auth:
- * <ul><li>state:UNCHALLENGED;</li>
- * <li>state:CHALLENGED;auth scheme:basic;credentials present</li></ul>
- * </p>
- * <p>
- * For Digest Auth:
- * <ul><li>state:UNCHALLENGED;</li>
- * <li>state:CHALLENGED;auth scheme:digest;credentials present</li></ul>
- * </p>
- * <p>
- * But if the proxy uses the same connection, it doesn't return 407, in
such case
- * this method is called only once with:
- * <ul><li>state:UNCHALLENGED</li></ul>
- * </p>
- */
- private void debugProxyAuthState(HttpContext context) {
- final AuthState proxyAuthState;
- if (shouldCheckProxyAuth()
- && logger.isDebugEnabled()
- && (proxyAuthState = (AuthState)
context.getAttribute("http.auth.proxy-scope")) != null) {
- logger.debug("authProxyScope={}", proxyAuthState);
- }
}
private IOException toIOException(ExecutionException e) {
@@ -680,293 +350,113 @@ public class SiteToSiteRestApiClient implements
Closeable {
}
public boolean openConnectionForReceive(final String transactionUrl, final
Peer peer) throws IOException {
-
- final HttpGet get = createGet(transactionUrl + "/flow-files");
- // Set uri so that it'll be used as transit uri.
- ((HttpCommunicationsSession)
peer.getCommunicationsSession()).setDataTransferUrl(get.getURI().toString());
-
- get.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-
- setHandshakeProperties(get);
-
- final CloseableHttpResponse response = getHttpClient().execute(get);
- final int responseCode = response.getStatusLine().getStatusCode();
- logger.debug("responseCode={}", responseCode);
-
- boolean keepItOpen = false;
- try {
- switch (responseCode) {
- case RESPONSE_CODE_OK:
- logger.debug("Server returned RESPONSE_CODE_OK, indicating
there was no data.");
- EntityUtils.consume(response.getEntity());
- return false;
-
- case RESPONSE_CODE_ACCEPTED:
- final InputStream httpIn =
response.getEntity().getContent();
- final InputStream streamCapture = new InputStream() {
- boolean closed = false;
-
- @Override
- public int read() throws IOException {
- if (closed) {
- return -1;
- }
- final int r = httpIn.read();
- if (r < 0) {
- closed = true;
- logger.debug("Reached to end of input stream.
Closing resources...");
- stopExtendingTransaction();
- closeSilently(httpIn);
- closeSilently(response);
- }
- return r;
+ final URI uri = getUri(transactionUrl + "/flow-files");
+ final HttpCommunicationsSession session = (HttpCommunicationsSession)
peer.getCommunicationsSession();
+ session.setDataTransferUrl(uri.toString());
+
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(uri).GET();
+ requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
+
+ final HttpResponse<InputStream> response = sendRequest(requestBuilder);
+ final int responseCode = response.statusCode();
+ switch (responseCode) {
+ case HTTP_OK:
+ logger.debug("Server returned RESPONSE_CODE_OK, indicating
there was no data.");
+ response.body().close();
+ return false;
+ case HTTP_ACCEPTED:
+ final InputStream httpIn = response.body();
+ final InputStream streamCapture = new InputStream() {
+ boolean closed = false;
+ @Override
+ public int read() throws IOException {
+ if (closed) {
+ return -1;
}
- };
- ((HttpInput)
peer.getCommunicationsSession().getInput()).setInputStream(streamCapture);
-
- startExtendingTransaction(transactionUrl);
- keepItOpen = true;
- return true;
-
- default:
- try (InputStream content =
response.getEntity().getContent()) {
- throw handleErrResponse(responseCode, content);
+ final int r = httpIn.read();
+ if (r < 0) {
+ closed = true;
+ logger.debug("Reached to end of input stream.
Closing resources...");
+ stopExtendingTransaction();
+ closeSilently(httpIn);
+ }
+ return r;
}
- }
- } finally {
- if (!keepItOpen) {
- response.close();
- }
+ };
+ ((HttpInput) session.getInput()).setInputStream(streamCapture);
+ startExtendingTransaction(transactionUrl);
+ return true;
+ default:
+ try (InputStream content = response.body()) {
+ throw handleErrResponse(responseCode, content);
+ }
}
}
-
public void openConnectionForSend(final String transactionUrl, final Peer
peer) throws IOException {
-
- final CommunicationsSession commSession =
peer.getCommunicationsSession();
final String flowFilesPath = transactionUrl + "/flow-files";
- final HttpPost post = createPost(flowFilesPath);
- // Set uri so that it'll be used as transit uri.
- ((HttpCommunicationsSession)
peer.getCommunicationsSession()).setDataTransferUrl(post.getURI().toString());
+ final URI uri = getUri(flowFilesPath);
+ final HttpCommunicationsSession session = (HttpCommunicationsSession)
peer.getCommunicationsSession();
+ session.setDataTransferUrl(uri.toString());
- post.setHeader("Content-Type", "application/octet-stream");
- post.setHeader("Accept", "text/plain");
- post.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+ final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(uri);
+ requestBuilder.setHeader("Content-Type", "application/octet-stream");
+ requestBuilder.setHeader(ACCEPT_HEADER, "text/plain");
+ requestBuilder.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+ setRequestHeaders(requestBuilder);
- setHandshakeProperties(post);
-
- final CountDownLatch initConnectionLatch = new CountDownLatch(1);
-
- final URI requestUri = post.getURI();
final PipedOutputStream outputStream = new PipedOutputStream();
final PipedInputStream inputStream = new
PipedInputStream(outputStream, DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
- final ReadableByteChannel dataPacketChannel =
Channels.newChannel(inputStream);
- final HttpAsyncRequestProducer asyncRequestProducer = new
HttpAsyncRequestProducer() {
-
- private final ByteBuffer buffer =
ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
-
- private long totalRead = 0;
- private long totalProduced = 0;
-
- private boolean requestHasBeenReset = false;
-
-
- @Override
- public HttpHost getTarget() {
- return URIUtils.extractHost(requestUri);
- }
-
- @Override
- public HttpRequest generateRequest() {
-
- // Pass the output stream so that Site-to-Site client thread
can send
- // data packet through this connection.
- logger.debug("sending data to {} has started...",
flowFilesPath);
- ((HttpOutput)
commSession.getOutput()).setOutputStream(outputStream);
- initConnectionLatch.countDown();
-
- final BasicHttpEntity entity = new BasicHttpEntity();
- entity.setChunked(true);
- entity.setContentType("application/octet-stream");
- post.setEntity(entity);
- return post;
- }
-
- private final AtomicBoolean bufferHasRemainingData = new
AtomicBoolean(false);
-
- /**
- * If the proxy server requires authentication, the same POST
request has to be sent again.
- * The first request will result 407, then the next one will be
sent with auth headers and actual data.
- * This method produces a content only when it's need to be sent,
to avoid producing the flow-file contents twice.
- * Whether we need to wait auth is determined heuristically by the
previous POST request which creates transaction.
- * See {@link
SiteToSiteRestApiClient#initiateTransactionForSend(HttpPost)} for further
detail.
- */
- @Override
- public void produceContent(final ContentEncoder encoder, final
IOControl ioControl) throws IOException {
-
- if (shouldCheckProxyAuth() && proxyAuthRequiresResend.get() &&
!requestHasBeenReset) {
- logger.debug("Need authentication with proxy server.
Postpone producing content.");
- encoder.complete();
- return;
- }
-
- if (bufferHasRemainingData.get()) {
- // If there's remaining buffer last time, send it first.
- writeBuffer(encoder);
- if (bufferHasRemainingData.get()) {
- return;
- }
- }
-
- int read;
- // This read() blocks until data becomes available,
- // or corresponding outputStream is closed.
- if ((read = dataPacketChannel.read(buffer)) > -1) {
-
- logger.trace("Read {} bytes from dataPacketChannel. {}",
read, flowFilesPath);
- totalRead += read;
-
- buffer.flip();
- writeBuffer(encoder);
-
- } else {
-
- final long totalWritten =
commSession.getOutput().getBytesWritten();
- logger.debug("sending data to {} has reached to its end.
produced {} bytes by reading {} bytes from channel. {} bytes written in this
transaction.",
- flowFilesPath, totalProduced, totalRead,
totalWritten);
-
- if (totalRead != totalWritten || totalProduced !=
totalWritten) {
- final String msg = "Sending data to %s has reached to
its end, but produced : read : wrote byte sizes (%d : %d : %d) were not equal.
Something went wrong.";
- throw new RuntimeException(String.format(msg,
flowFilesPath, totalProduced, totalRead, totalWritten));
- }
- transferDataLatch.countDown();
- encoder.complete();
- dataPacketChannel.close();
- }
-
- }
-
- private void writeBuffer(ContentEncoder encoder) throws
IOException {
- while (buffer.hasRemaining()) {
- final int written = encoder.write(buffer);
- logger.trace("written {} bytes to encoder.", written);
- if (written == 0) {
- logger.trace("Buffer still has remaining. {}", buffer);
- bufferHasRemainingData.set(true);
- return;
- }
- totalProduced += written;
- }
- bufferHasRemainingData.set(false);
- buffer.clear();
- }
-
- @Override
- public void requestCompleted(final HttpContext context) {
- logger.debug("Sending data to {} completed.", flowFilesPath);
- debugProxyAuthState(context);
- }
-
- @Override
- public void failed(final Exception ex) {
- final String msg = String.format("Failed to send data to %s
due to %s", flowFilesPath, ex.toString());
- logger.error(msg, ex);
- eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY,
msg);
- }
-
- @Override
- public boolean isRepeatable() {
- // In order to pass authentication, request has to be
repeatable.
- return true;
- }
-
- @Override
- public void resetRequest() {
- logger.debug("Sending data request to {} has been reset...",
flowFilesPath);
- requestHasBeenReset = true;
- }
-
- @Override
- public void close() {
- logger.debug("Closing sending data request to {}",
flowFilesPath);
- closeSilently(outputStream);
- closeSilently(dataPacketChannel);
- stopExtendingTransaction();
- }
- };
-
- postResult = getHttpAsyncClient().execute(asyncRequestProducer, new
BasicAsyncResponseConsumer(), null);
-
- try {
- // Need to wait the post request actually started so that we can
write to its output stream.
- if (!initConnectionLatch.await(connectTimeoutMillis,
TimeUnit.MILLISECONDS)) {
- throw new IOException("Awaiting initConnectionLatch has been
timeout.");
- }
-
- // Started.
- transferDataLatch = new CountDownLatch(1);
- startExtendingTransaction(transactionUrl);
-
- } catch (final InterruptedException e) {
- throw new IOException("Awaiting initConnectionLatch has been
interrupted.", e);
- }
+ final HttpOutput httpOutput = (HttpOutput) session.getOutput();
+ httpOutput.setOutputStream(outputStream);
+ requestBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(
+ () -> inputStream
+ ));
+ final HttpRequest request = requestBuilder.build();
+ transactionFuture = httpClient.sendAsync(request,
HttpResponse.BodyHandlers.ofString());
+ startExtendingTransaction(transactionUrl);
}
public void finishTransferFlowFiles(final CommunicationsSession
commSession) throws IOException {
-
- if (postResult == null) {
- new IllegalStateException("Data transfer has not started yet.");
+ if (transactionFuture == null) {
+ throw new IllegalStateException("Data Transfer not started");
}
- // No more data can be sent.
- // Close PipedOutputStream so that dataPacketChannel doesn't get
blocked.
- // If we don't close this output stream, then PipedInputStream loops
infinitely at read().
+ // Close PipedOutputStream from openConnectionForSend() to avoid
blocking on PipedInputStream.read()
commSession.getOutput().getOutputStream().close();
- logger.debug("{} FinishTransferFlowFiles no more data can be sent",
this);
-
- try {
- if (!transferDataLatch.await(requestExpirationMillis,
TimeUnit.MILLISECONDS)) {
- throw new IOException("Awaiting transferDataLatch has been
timeout.");
- }
- } catch (final InterruptedException e) {
- throw new IOException("Awaiting transferDataLatch has been
interrupted.", e);
- }
stopExtendingTransaction();
- final HttpResponse response;
+ final HttpResponse<String> response;
try {
- response = postResult.get(readTimeoutMillis,
TimeUnit.MILLISECONDS);
+ response = transactionFuture.get(readTimeoutMillis,
TimeUnit.MILLISECONDS);
} catch (final ExecutionException e) {
- logger.debug("Something has happened at sending data thread. {}",
e.getMessage());
throw toIOException(e);
} catch (TimeoutException | InterruptedException e) {
throw new IOException(e);
}
- final int responseCode = response.getStatusLine().getStatusCode();
- if (responseCode == RESPONSE_CODE_ACCEPTED) {
- final String receivedChecksum =
EntityUtils.toString(response.getEntity());
+ final int responseCode = response.statusCode();
+ if (responseCode == HTTP_ACCEPTED) {
+ final String receivedChecksum = response.body();
((HttpInput) commSession.getInput()).setInputStream(new
ByteArrayInputStream(receivedChecksum.getBytes()));
((HttpCommunicationsSession)
commSession).setChecksum(receivedChecksum);
logger.debug("receivedChecksum={}", receivedChecksum);
} else {
- try (InputStream content = response.getEntity().getContent()) {
+ try (InputStream content = new
ByteArrayInputStream(response.body().getBytes(StandardCharsets.UTF_8))) {
throw handleErrResponse(responseCode, content);
}
}
}
private void startExtendingTransaction(final String transactionUrl) {
- if (ttlExtendingFuture != null) {
- return;
+ if (ttlExtendingFuture == null) {
+ final int extendFrequency = serverTransactionTtl / 2;
+ logger.debug("Extend Transaction Started [{}] Frequency [{}
seconds]", transactionUrl, extendFrequency);
+ final Runnable command = new ExtendTransactionCommand(this,
transactionUrl, eventReporter);
+ ttlExtendingFuture =
ttlExtendTaskExecutor.scheduleWithFixedDelay(command, extendFrequency,
extendFrequency, TimeUnit.SECONDS);
}
-
- final int extendFrequency = serverTransactionTtl / 2;
- logger.debug("Extend Transaction Started [{}] Frequency [{} seconds]",
transactionUrl, extendFrequency);
- final Runnable command = new ExtendTransactionCommand(this,
transactionUrl, eventReporter);
- ttlExtendingFuture =
ttlExtendTaskExecutor.scheduleWithFixedDelay(command, extendFrequency,
extendFrequency, TimeUnit.SECONDS);
}
private void closeSilently(final Closeable closeable) {
@@ -975,35 +465,15 @@ public class SiteToSiteRestApiClient implements Closeable
{
closeable.close();
}
} catch (final IOException e) {
- logger.warn("Got an exception when closing {}: {}", closeable,
e.getMessage());
- if (logger.isDebugEnabled()) {
- logger.warn("", e);
- }
+ logger.debug("Failed close [{}]", closeable, e);
}
}
public TransactionResultEntity extendTransaction(final String
transactionUrl) throws IOException {
logger.debug("Sending extendTransaction request to transactionUrl:
{}", transactionUrl);
-
- final HttpPut put = createPut(transactionUrl);
-
- put.setHeader("Accept", "application/json");
- put.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-
- setHandshakeProperties(put);
-
- try (final CloseableHttpResponse response =
getHttpClient().execute(put)) {
- final int responseCode = response.getStatusLine().getStatusCode();
- logger.debug("extendTransaction responseCode={}", responseCode);
-
- try (final InputStream content =
response.getEntity().getContent()) {
- if (responseCode == RESPONSE_CODE_OK) {
- return readResponse(content);
- }
- throw handleErrResponse(responseCode, content);
- }
- }
-
+ final URI uri = getUri(transactionUrl);
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(uri).PUT(HttpRequest.BodyPublishers.noBody());
+ return send(requestBuilder, TransactionResultEntity.class);
}
private void stopExtendingTransaction() {
@@ -1018,10 +488,6 @@ public class SiteToSiteRestApiClient implements Closeable
{
}
private IOException handleErrResponse(final int responseCode, final
InputStream in) throws IOException {
- if (in == null) {
- return new IOException("Unexpected response code: " +
responseCode);
- }
-
final TransactionResultEntity errEntity = readResponse(in);
final ResponseCode errCode =
ResponseCode.fromCode(errEntity.getResponseCode());
@@ -1029,7 +495,7 @@ public class SiteToSiteRestApiClient implements Closeable {
case UNKNOWN_PORT -> new
UnknownPortException(errEntity.getMessage());
case PORT_NOT_IN_VALID_STATE -> new
PortNotRunningException(errEntity.getMessage());
default -> {
- if (responseCode == RESPONSE_CODE_FORBIDDEN) {
+ if (responseCode == HTTP_FORBIDDEN) {
yield new HandshakeException(errEntity.getMessage());
}
yield new IOException("Unexpected response code: " +
responseCode + " errCode:" + errCode + " errMessage:" + errEntity.getMessage());
@@ -1044,16 +510,9 @@ public class SiteToSiteRestApiClient implements Closeable
{
String responseMessage = null;
try {
- responseMessage = new String(bos.toByteArray(),
StandardCharsets.UTF_8);
- logger.debug("readResponse responseMessage={}", responseMessage);
-
- final ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(responseMessage,
TransactionResultEntity.class);
- } catch (JsonParseException | JsonMappingException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Failed to parse JSON.", e);
- }
-
+ responseMessage = bos.toString(StandardCharsets.UTF_8);
+ return objectMapper.readValue(responseMessage,
TransactionResultEntity.class);
+ } catch (final JsonParseException | JsonMappingException e) {
final TransactionResultEntity entity = new
TransactionResultEntity();
entity.setResponseCode(ResponseCode.ABORT.getCode());
entity.setMessage(responseMessage);
@@ -1061,41 +520,41 @@ public class SiteToSiteRestApiClient implements
Closeable {
}
}
- private String readTransactionUrl(final HttpResponse response) {
- final Header locationUriIntentHeader =
response.getFirstHeader(LOCATION_URI_INTENT_NAME);
+ private String readTransactionUrl(final HttpResponse<InputStream>
response) {
+ final Optional<String> locationUriIntentHeader =
response.headers().firstValue(LOCATION_URI_INTENT_NAME);
logger.debug("locationUriIntentHeader={}", locationUriIntentHeader);
- if (locationUriIntentHeader != null &&
LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.getValue())) {
- final Header transactionUrl =
response.getFirstHeader(LOCATION_HEADER_NAME);
+ if (locationUriIntentHeader.isPresent() &&
LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.get())) {
+ final Optional<String> transactionUrl =
response.headers().firstValue(LOCATION_HEADER_NAME);
logger.debug("transactionUrl={}", transactionUrl);
- if (transactionUrl != null) {
- return transactionUrl.getValue();
+ if (transactionUrl.isPresent()) {
+ return transactionUrl.get();
}
}
return null;
}
- private void setHandshakeProperties(final HttpRequestBase httpRequest) {
+ private void setRequestHeaders(final HttpRequest.Builder requestBuilder) {
if (compress) {
- httpRequest.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, "true");
+ requestBuilder.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION,
Boolean.TRUE.toString());
}
if (requestExpirationMillis > 0) {
- httpRequest.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION,
String.valueOf(requestExpirationMillis));
+ requestBuilder.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION,
String.valueOf(requestExpirationMillis));
}
if (batchCount > 0) {
- httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT,
String.valueOf(batchCount));
+ requestBuilder.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT,
String.valueOf(batchCount));
}
if (batchSize > 0) {
- httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE,
String.valueOf(batchSize));
+ requestBuilder.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE,
String.valueOf(batchSize));
}
if (batchDurationMillis > 0) {
- httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION,
String.valueOf(batchDurationMillis));
+ requestBuilder.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION,
String.valueOf(batchDurationMillis));
}
}
@@ -1116,114 +575,18 @@ public class SiteToSiteRestApiClient implements
Closeable {
return url;
}
-
- private HttpGet createGet(final String path) {
- final URI url = getUri(path);
- final HttpGet get = new HttpGet(url);
- get.setConfig(getRequestConfig());
- return get;
- }
-
- private HttpPost createPost(final String path) {
- final URI url = getUri(path);
- final HttpPost post = new HttpPost(url);
- post.setConfig(getRequestConfig());
- return post;
- }
-
- private HttpPut createPut(final String path) {
- final URI url = getUri(path);
- final HttpPut put = new HttpPut(url);
- put.setConfig(getRequestConfig());
- return put;
- }
-
- private HttpDelete createDelete(final String path) {
- final URI url = getUri(path);
- final HttpDelete delete = new HttpDelete(url);
- delete.setConfig(getRequestConfig());
- return delete;
- }
-
- private String execute(final HttpGet get) throws IOException {
- final CloseableHttpClient httpClient = getHttpClient();
-
- if (logger.isTraceEnabled()) {
- Arrays.stream(get.getAllHeaders()).forEach(h -> logger.debug("REQ|
{}", h));
- }
-
- try (final CloseableHttpResponse response = httpClient.execute(get)) {
- if (logger.isTraceEnabled()) {
- Arrays.stream(response.getAllHeaders()).forEach(h ->
logger.debug("RES| {}", h));
- }
-
- final StatusLine statusLine = response.getStatusLine();
- final int statusCode = statusLine.getStatusCode();
- if (RESPONSE_CODE_OK != statusCode) {
- throw new HttpGetFailedException(statusCode,
statusLine.getReasonPhrase(), null);
- }
- final HttpEntity entity = response.getEntity();
- final String responseMessage = EntityUtils.toString(entity);
- return responseMessage;
- }
- }
-
- public class HttpGetFailedException extends IOException {
- private static final long serialVersionUID = 7920714957269466946L;
-
- private final int responseCode;
- private final String responseMessage;
- private final String explanation;
-
- public HttpGetFailedException(final int responseCode, final String
responseMessage, final String explanation) {
- super("response code " + responseCode + ":" + responseMessage + "
with explanation: " + explanation);
- this.responseCode = responseCode;
- this.responseMessage = responseMessage;
- this.explanation = explanation;
- }
-
- public int getResponseCode() {
- return responseCode;
- }
-
- public String getDescription() {
- return StringUtils.isNotEmpty(explanation) ? explanation :
responseMessage;
- }
- }
-
-
- private <T> T execute(final HttpGet get, final Class<T> entityClass)
throws IOException {
- get.setHeader("Accept", "application/json");
- final String responseMessage = execute(get);
-
- final ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
-
- try {
- return mapper.readValue(responseMessage, entityClass);
- } catch (JsonParseException e) {
- final String msg = "Failed to parse Json. The specified URL " +
baseUrl + " is not a proper remote NiFi endpoint for Site-to-Site
communication.";
- logger.warn("{} requestedUrl={}, response={}", msg, get.getURI(),
responseMessage);
- throw new IOException(msg, e);
- }
- }
-
public String getBaseUrl() {
return baseUrl;
}
- /**
- * Set the baseUrl as it is, without altering or adjusting the specified
url string.
- * If the url is specified by user input, and if it needs to be resolved
with leniency,
- * then use {@link #resolveBaseUrl(String)} method before passing it to
this method.
- * @param baseUrl url to set
- */
public void setBaseUrl(final String baseUrl) {
this.baseUrl = baseUrl;
}
public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
- this.connectTimeoutMillis = connectTimeoutMillis;
+
httpClientBuilder.connectTimeout(Duration.ofMillis(connectTimeoutMillis));
+ httpClient.close();
+ httpClient = httpClientBuilder.build();
}
public void setReadTimeoutMillis(final int readTimeoutMillis) {
@@ -1234,144 +597,12 @@ public class SiteToSiteRestApiClient implements
Closeable {
this.cacheExpirationMillis = expirationMillis;
}
- public static String getFirstUrl(final String clusterUrlStr) {
- if (clusterUrlStr == null) {
- return null;
- }
-
- final int commaIndex = clusterUrlStr.indexOf(',');
- if (commaIndex > -1) {
- return clusterUrlStr.substring(0, commaIndex);
- }
- return clusterUrlStr;
- }
-
- /**
- * Parse the comma-separated URLs string for the remote NiFi instances.
- * @return A set containing one or more URLs
- * @throws IllegalArgumentException when it fails to parse the URLs string,
- * URLs string contains multiple protocols (http and https mix),
- * or none of URL is specified.
- */
- public static Set<String> parseClusterUrls(final String clusterUrlStr) {
- final Set<String> urls = new LinkedHashSet<>();
- if (clusterUrlStr != null && !clusterUrlStr.isEmpty()) {
- Arrays.stream(clusterUrlStr.split(","))
- .map(s -> s.trim())
- .filter(s -> !s.isEmpty())
- .forEach(s -> {
- validateUriString(s);
- urls.add(resolveBaseUrl(s).intern());
- });
- }
-
- if (urls.isEmpty()) {
- throw new IllegalArgumentException("Cluster URL was not
specified.");
- }
-
- final Predicate<String> isHttps = url ->
url.toLowerCase().startsWith("https:");
- if (urls.stream().anyMatch(isHttps) &&
urls.stream().anyMatch(isHttps.negate())) {
- throw new IllegalArgumentException("Different protocols are used
in the cluster URLs " + clusterUrlStr);
- }
-
- return Collections.unmodifiableSet(urls);
- }
-
- private static void validateUriString(String s) {
- // parse the uri
- final URI uri;
- try {
- uri = URI.create(s);
- } catch (final IllegalArgumentException e) {
- throw new IllegalArgumentException("The specified remote process
group URL is malformed: " + s);
- }
-
- // validate each part of the uri
- if (uri.getScheme() == null || uri.getHost() == null) {
- throw new IllegalArgumentException("The specified remote process
group URL is malformed: " + s);
- }
-
- if (!(uri.getScheme().equalsIgnoreCase("http") ||
uri.getScheme().equalsIgnoreCase("https"))) {
- throw new IllegalArgumentException("The specified remote process
group URL is invalid because it is not http or https: " + s);
- }
- }
-
- private static String resolveBaseUrl(final String clusterUrl) {
- Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null.");
- final URI uri;
- try {
- uri = new URI(clusterUrl.trim());
- } catch (final URISyntaxException e) {
- throw new IllegalArgumentException("The specified URL is
malformed: " + clusterUrl);
- }
-
- return resolveBaseUrl(uri);
- }
-
- /**
- * Resolve NiFi API url with leniency. This method does following
conversion on uri path:
- * <ul>
- * <li>/ to /nifi-api</li>
- * <li>/nifi to /nifi-api</li>
- * <li>/some/path/ to /some/path/nifi-api</li>
- * </ul>
- * @param clusterUrl url to be resolved
- * @return resolved url
- */
- private static String resolveBaseUrl(final URI clusterUrl) {
-
- if (clusterUrl.getScheme() == null || clusterUrl.getHost() == null) {
- throw new IllegalArgumentException("The specified URL is
malformed: " + clusterUrl);
- }
-
- if (!(clusterUrl.getScheme().equalsIgnoreCase("http") ||
clusterUrl.getScheme().equalsIgnoreCase("https"))) {
- throw new IllegalArgumentException("The specified URL is invalid
because it is not http or https: " + clusterUrl);
- }
-
-
- String uriPath = clusterUrl.getPath().trim();
-
- if (StringUtils.isEmpty(uriPath) || uriPath.equals("/")) {
- uriPath = "/nifi";
- } else if (uriPath.endsWith("/")) {
- uriPath = uriPath.substring(0, uriPath.length() - 1);
- }
-
- final StringBuilder uriPathBuilder = new StringBuilder(uriPath);
- if (uriPath.endsWith("/nifi")) {
- uriPathBuilder.append("-api");
- } else if (!uriPath.endsWith("/nifi-api")) {
- uriPathBuilder.append("/nifi-api");
- }
-
- try {
- return new URIBuilder()
- .setScheme(clusterUrl.getScheme())
- .setHost(clusterUrl.getHost())
- .setPort(clusterUrl.getPort())
- .setPath(uriPathBuilder.toString())
- .build()
- .toString();
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
public void setBaseUrl(final String scheme, final String host, final int
port) {
- setBaseUrl(scheme, host, port, "/nifi-api");
- }
-
- private void setBaseUrl(final String scheme, final String host, final int
port, final String path) {
final String baseUri;
try {
- baseUri = new URIBuilder()
- .setScheme(scheme)
- .setHost(host)
- .setPort(port)
- .setPath(path)
- .build()
- .toString();
- } catch (URISyntaxException e) {
+ final URI uri = new URI(scheme, null, host, port, "/nifi-api",
null, null);
+ baseUri = uri.toString();
+ } catch (final URISyntaxException e) {
throw new IllegalArgumentException(e);
}
this.setBaseUrl(baseUri);
@@ -1382,7 +613,9 @@ public class SiteToSiteRestApiClient implements Closeable {
}
public void setLocalAddress(final InetAddress localAddress) {
- this.localAddress = localAddress;
+ httpClientBuilder.localAddress(localAddress);
+ httpClient.close();
+ httpClient = httpClientBuilder.build();
}
public void setRequestExpirationMillis(final long requestExpirationMillis)
{
@@ -1432,50 +665,80 @@ public class SiteToSiteRestApiClient implements
Closeable {
urlBuilder.append("&checksum=").append(checksum);
}
- final HttpDelete delete = createDelete(urlBuilder.toString());
- delete.setHeader("Accept", "application/json");
- delete.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-
- setHandshakeProperties(delete);
-
- try (CloseableHttpResponse response = getHttpClient().execute(delete))
{
- final int responseCode = response.getStatusLine().getStatusCode();
- logger.debug("commitReceivingFlowFiles responseCode={}",
responseCode);
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(getUri(urlBuilder.toString())).DELETE();
+ requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
- try (InputStream content = response.getEntity().getContent()) {
- return switch (responseCode) {
- case RESPONSE_CODE_OK -> readResponse(content);
- case RESPONSE_CODE_BAD_REQUEST -> readResponse(content);
- default -> throw handleErrResponse(responseCode, content);
- };
- }
+ final HttpResponse<InputStream> response = sendRequest(requestBuilder);
+ final int responseCode = response.statusCode();
+ try (InputStream content = response.body()) {
+ return switch (responseCode) {
+ case HTTP_OK, HTTP_BAD_REQUEST -> readResponse(content);
+ default -> throw handleErrResponse(responseCode, content);
+ };
}
-
}
public TransactionResultEntity commitTransferFlowFiles(final String
transactionUrl, final ResponseCode clientResponse) throws IOException {
final String requestUrl = transactionUrl + "?responseCode=" +
clientResponse.getCode();
logger.debug("Sending commitTransferFlowFiles request to
transactionUrl: {}", requestUrl);
- final HttpDelete delete = createDelete(requestUrl);
- delete.setHeader("Accept", "application/json");
- delete.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+ final HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder(getUri(requestUrl)).DELETE();
+ requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
- setHandshakeProperties(delete);
+ final HttpResponse<InputStream> response = sendRequest(requestBuilder);
+ final int responseCode = response.statusCode();
+ try (InputStream content = response.body()) {
+ return switch (responseCode) {
+ case HTTP_OK, HTTP_BAD_REQUEST -> readResponse(content);
+ default -> throw handleErrResponse(responseCode, content);
+ };
+ }
+ }
- try (CloseableHttpResponse response = getHttpClient().execute(delete))
{
- final int responseCode = response.getStatusLine().getStatusCode();
- logger.debug("commitTransferFlowFiles responseCode={}",
responseCode);
+ private <T> T send(final HttpRequest.Builder requestBuilder, final
Class<T> responseClass) throws IOException {
+ requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
+ final HttpResponse<InputStream> response = sendRequest(requestBuilder);
+ final int statusCode = response.statusCode();
- try (InputStream content = response.getEntity().getContent()) {
- return switch (responseCode) {
- case RESPONSE_CODE_OK -> readResponse(content);
- case RESPONSE_CODE_BAD_REQUEST -> readResponse(content);
- default -> throw handleErrResponse(responseCode, content);
- };
+ try (InputStream inputStream = response.body()) {
+ if (HTTP_OK == statusCode) {
+ return objectMapper.readValue(inputStream, responseClass);
+ } else {
+ throw new IOException("Request URI [%s] HTTP
%d".formatted(response.uri(), statusCode));
}
}
+ }
+
+ private HttpResponse<InputStream> sendRequest(final HttpRequest.Builder
requestBuilder) throws IOException {
+ setRequestHeaders(requestBuilder);
+
+ requestBuilder.setHeader(HttpHeaders.PROTOCOL_VERSION,
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+ requestBuilder.timeout(Duration.ofMillis(readTimeoutMillis));
+
+ final HttpRequest request = requestBuilder.build();
+
+ try {
+ final HttpResponse<InputStream> response =
httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
+ final Optional<SSLSession> sslSessionFound = response.sslSession();
+ sslSessionFound.ifPresent(this::setTrustedPeerDn);
+ return response;
+ } catch (final InterruptedException e) {
+ throw new IOException("Request URI [%s]
interrupted".formatted(request.uri()), e);
+ }
+ }
+ private void setTrustedPeerDn(final SSLSession sslSession) {
+ try {
+ final Certificate[] peerCertificates =
sslSession.getPeerCertificates();
+ if (peerCertificates.length == 0) {
+ logger.info("Peer Certificates not found");
+ } else {
+ final X509Certificate peerCertificate = (X509Certificate)
peerCertificates[0];
+ trustedPeerDn =
StandardPrincipalFormatter.getInstance().getSubject(peerCertificate);
+ }
+ } catch (final SSLPeerUnverifiedException e) {
+ logger.warn("Peer Certificate verification failed", e);
+ }
}
private static class RemoteGroupContents {
diff --git
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/SiteToSiteRestApiClientTest.java
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/SiteToSiteRestApiClientTest.java
new file mode 100644
index 0000000000..2fbdbdd049
--- /dev/null
+++
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/SiteToSiteRestApiClientTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import mockwebserver3.RecordedRequest;
+import okhttp3.Headers;
+import okhttp3.HttpUrl;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.SiteToSiteEventReporter;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.protocol.http.HttpHeaders;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.remote.PeerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.api.entity.PeersEntity;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static java.net.HttpURLConnection.HTTP_ACCEPTED;
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_PROXY_AUTH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@Timeout(5)
+class SiteToSiteRestApiClientTest {
+
+ private static final int TIMEOUT = 5;
+
+ private static final String APPLICATION_JSON = "application/json";
+ private static final String APPLICATION_OCTET_STREAM =
"application/octet-stream";
+ private static final String TEXT_PLAIN = "text/plain";
+
+ private static final String ACCEPT_HEADER = "Accept";
+ private static final String CONTENT_TYPE_HEADER = "Content-Type";
+ private static final String PROXY_AUTHENTICATE_HEADER =
"Proxy-Authenticate";
+ private static final String PROXY_AUTHORIZATION_HEADER =
"Proxy-Authorization";
+ private static final String BASIC_REALM = "Basic realm=\"NiFi\"";
+
+ private static final String NIFI_API_PATH = "/nifi-api";
+ private static final String SITE_TO_SITE_PATH = "/site-to-site";
+ private static final String SITE_TO_SITE_PEERS_PATH =
"/site-to-site/peers";
+ private static final String RECEIVE_TRANSACTION_PATH =
"/data-transfer/output-ports/port-identifier/transactions";
+ private static final String RECEIVE_TRANSACTION_FORMAT =
"%s/data-transfer/output-ports/port-identifier/transactions";
+ private static final String SEND_TRANSACTION_PATH =
"/data-transfer/input-ports/port-identifier/transactions";
+ private static final String SEND_TRANSACTION_FORMAT =
"%s/data-transfer/input-ports/port-identifier/transactions";
+ private static final String FLOW_FILES_PATH_FORMAT = "%s/flow-files";
+
+ private static final String DELETE_METHOD = "DELETE";
+ private static final String GET_METHOD = "GET";
+ private static final String POST_METHOD = "POST";
+ private static final String PUT_METHOD = "PUT";
+
+ private static final String CHECKSUM_PARAMETER = "checksum";
+ private static final String RESPONSE_CODE_PARAMETER = "responseCode";
+
+ private static final int LATEST_PROTOCOL_VERSION = 5;
+ private static final String FIRST_PROTOCOL_VERSION = "1";
+ private static final String CHECKSUM = "CHECKSUM";
+ private static final String PORT_ID = "port-identifier";
+ private static final int TRANSACTION_TTL = 5000;
+ private static final int READ_TIMEOUT = 5000;
+
+ private static final String PROXY_USER = "user";
+ private static final String PROXY_PASS = "granted";
+ private static final String PROXY_CREDENTIALS = "Basic dXNlcjpncmFudGVk";
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final MockWebServer server = new MockWebServer();
+
+ private SiteToSiteRestApiClient apiClient;
+
+ @BeforeEach
+ void setApiClient() throws IOException {
+ server.start();
+ apiClient = new SiteToSiteRestApiClient(null, null,
SiteToSiteEventReporter.DISABLED);
+ apiClient.setReadTimeoutMillis(READ_TIMEOUT);
+ apiClient.setRequestExpirationMillis(READ_TIMEOUT);
+ apiClient.setConnectTimeoutMillis(READ_TIMEOUT);
+ apiClient.setCacheExpirationMillis(READ_TIMEOUT);
+
+ final InetAddress localAddress = InetAddress.getByName("127.0.0.1");
+ apiClient.setLocalAddress(localAddress);
+ }
+
+ @AfterEach
+ void stopServer() throws IOException {
+ server.close();
+ apiClient.close();
+ }
+
+ @Test
+ void testSetBaseUrl() {
+ final HttpUrl url = server.url(NIFI_API_PATH);
+
+ apiClient.setBaseUrl(url.scheme(), url.host(), url.port());
+
+ final String baseUrl = apiClient.getBaseUrl();
+ assertEquals(url.toString(), baseUrl);
+ }
+
+ @Test
+ void testGetController() throws IOException, InterruptedException {
+ enqueueController();
+
+ final String serverUrl = getServerUrl();
+ final ControllerDTO controller = apiClient.getController(serverUrl);
+
+ assertNotNull(controller);
+ assertEquals(ControllerDTO.class.getName(), controller.getId());
+
+ assertRequestRecorded(SITE_TO_SITE_PATH);
+ assertEquals(LATEST_PROTOCOL_VERSION,
apiClient.getTransactionProtocolVersion());
+ }
+
+ @Test
+ void testGetControllerProxy() throws IOException, InterruptedException {
+ final Proxy proxyAddress = server.getProxyAddress();
+ final InetSocketAddress proxyServerAddress = (InetSocketAddress)
proxyAddress.address();
+ final HttpProxy httpProxy = new
HttpProxy(proxyServerAddress.getHostName(), proxyServerAddress.getPort(),
PROXY_USER, PROXY_PASS);
+ apiClient = new SiteToSiteRestApiClient(null, httpProxy,
SiteToSiteEventReporter.DISABLED);
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+ apiClient.setReadTimeoutMillis(READ_TIMEOUT);
+
+ server.enqueue(new MockResponse.Builder()
+ .code(HTTP_PROXY_AUTH)
+ .setHeader(PROXY_AUTHENTICATE_HEADER, BASIC_REALM)
+ .build()
+ );
+
+ enqueueController();
+
+ final ControllerDTO controller = apiClient.getController(serverUrl);
+ assertEquals(ControllerDTO.class.getName(), controller.getId());
+
+ assertRequestRecorded(SITE_TO_SITE_PATH);
+
+ final RecordedRequest authenticatedRequest =
assertRequestRecorded(SITE_TO_SITE_PATH);
+ final Headers authenticatedHeaders = authenticatedRequest.getHeaders();
+ final String proxyAuthorization =
authenticatedHeaders.get(PROXY_AUTHORIZATION_HEADER);
+ assertEquals(PROXY_CREDENTIALS, proxyAuthorization);
+ }
+
+ @Test
+ void testGetPeers() throws IOException, InterruptedException {
+ final PeersEntity entity = new PeersEntity();
+ entity.setPeers(List.of());
+
+ enqueueResponseBody(entity);
+
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+
+ final Collection<PeerDTO> peers = apiClient.getPeers();
+ assertNotNull(peers);
+
+ final RecordedRequest request =
assertRequestRecorded(SITE_TO_SITE_PEERS_PATH);
+ assertProtocolVersionFound(request);
+ }
+
+ @Test
+ void testCommitTransferFlowFiles() throws IOException,
InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+
+ final TransactionResultEntity entity = new TransactionResultEntity();
+ enqueueResponseBody(entity);
+
+ final ResponseCode responseCode = ResponseCode.PROPERTIES_OK;
+ final TransactionResultEntity resultEntity =
apiClient.commitTransferFlowFiles(RECEIVE_TRANSACTION_PATH, responseCode);
+
+ assertNotNull(resultEntity);
+ final RecordedRequest request =
assertRequestRecorded(RECEIVE_TRANSACTION_PATH);
+ assertEquals(DELETE_METHOD, request.getMethod());
+
+ assertResponseCodeEquals(request, responseCode);
+ assertProtocolVersionFound(request);
+ }
+
+ @Test
+ void testCommitReceivingFlowFiles() throws IOException,
InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+
+ final TransactionResultEntity entity = new TransactionResultEntity();
+ enqueueResponseBody(entity);
+
+ final ResponseCode responseCode = ResponseCode.CONFIRM_TRANSACTION;
+ final TransactionResultEntity resultEntity =
apiClient.commitReceivingFlowFiles(RECEIVE_TRANSACTION_PATH, responseCode,
CHECKSUM);
+
+ assertNotNull(resultEntity);
+ final RecordedRequest request =
assertRequestRecorded(RECEIVE_TRANSACTION_PATH);
+ assertEquals(DELETE_METHOD, request.getMethod());
+
+ final String checksumParameter =
request.getUrl().queryParameter(CHECKSUM_PARAMETER);
+ assertEquals(CHECKSUM, checksumParameter);
+
+ assertResponseCodeEquals(request, responseCode);
+ assertProtocolVersionFound(request);
+ }
+
+ @Test
+ void testInitiateTransactionCreated() throws IOException,
InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+
+ initiateTransaction(serverUrl, TransferDirection.RECEIVE);
+ }
+
+ @Test
+ void testInitiateTransactionSendCreated() throws IOException,
InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+
+ initiateTransaction(serverUrl, TransferDirection.SEND);
+ }
+
+ @Test
+ void testInitiateTransactionSendProxyAuthentication() throws IOException,
InterruptedException {
+ final Proxy proxyAddress = server.getProxyAddress();
+ final InetSocketAddress proxyServerAddress = (InetSocketAddress)
proxyAddress.address();
+ final HttpProxy httpProxy = new
HttpProxy(proxyServerAddress.getHostName(), proxyServerAddress.getPort(),
PROXY_USER, PROXY_PASS);
+ apiClient = new SiteToSiteRestApiClient(null, httpProxy,
SiteToSiteEventReporter.DISABLED);
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+ apiClient.setReadTimeoutMillis(READ_TIMEOUT);
+
+ enqueueController();
+ enqueueTransaction(serverUrl);
+
+ final String transactionUri =
apiClient.initiateTransaction(TransferDirection.SEND, PORT_ID);
+ assertEquals(serverUrl, transactionUri);
+
+ final RecordedRequest controllerRequest =
assertRequestRecorded(SITE_TO_SITE_PATH);
+ assertEquals(GET_METHOD, controllerRequest.getMethod());
+
+ final RecordedRequest request =
assertRequestRecorded(SEND_TRANSACTION_PATH);
+ assertEquals(POST_METHOD, request.getMethod());
+ assertProtocolVersionFound(request);
+ }
+
+ @Test
+ void testExtendTransaction() throws IOException, InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+
+ final TransactionResultEntity entity = new TransactionResultEntity();
+ enqueueResponseBody(entity);
+
+ final TransactionResultEntity resultEntity =
apiClient.extendTransaction(RECEIVE_TRANSACTION_PATH);
+
+ assertNotNull(resultEntity);
+ final RecordedRequest request =
assertRequestRecorded(RECEIVE_TRANSACTION_PATH);
+ assertEquals(PUT_METHOD, request.getMethod());
+ assertProtocolVersionFound(request);
+ }
+
+ @Test
+ void testExtendTransactionServerError() throws InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+
+ server.enqueue(new
MockResponse.Builder().code(HTTP_INTERNAL_ERROR).build());
+
+ assertThrows(IOException.class, () ->
apiClient.extendTransaction(RECEIVE_TRANSACTION_PATH));
+
+ final RecordedRequest request =
assertRequestRecorded(RECEIVE_TRANSACTION_PATH);
+ assertEquals(PUT_METHOD, request.getMethod());
+ assertProtocolVersionFound(request);
+ }
+
+ @Test
+ void testOpenConnectionForReceive() throws IOException,
InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+ final PeerDescription peerDescription = new
PeerDescription(server.getHostName(), server.getPort(), false);
+ final HttpCommunicationsSession session = new
HttpCommunicationsSession();
+ final Peer peer = new Peer(peerDescription, session, serverUrl,
serverUrl);
+
+ server.enqueue(new MockResponse.Builder().code(HTTP_OK).build());
+
+ final boolean open =
apiClient.openConnectionForReceive(RECEIVE_TRANSACTION_PATH, peer);
+ assertFalse(open);
+
+ final RecordedRequest request = server.takeRequest(TIMEOUT,
TimeUnit.SECONDS);
+ assertNotNull(request);
+ assertEquals(GET_METHOD, request.getMethod());
+ assertProtocolVersionFound(request);
+ }
+
+ @Test
+ void testOpenConnectionForReceiveAccepted() throws IOException,
InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+ final String receiveTransactionUri =
RECEIVE_TRANSACTION_FORMAT.formatted(serverUrl);
+
+ final String transactionUri =
initiateTransaction(receiveTransactionUri, TransferDirection.RECEIVE);
+
+ final PeerDescription peerDescription = new
PeerDescription(server.getHostName(), server.getPort(), false);
+ final HttpCommunicationsSession session = new
HttpCommunicationsSession();
+ final Peer peer = new Peer(peerDescription, session, serverUrl,
serverUrl);
+
+ server.enqueue(new MockResponse.Builder().code(HTTP_ACCEPTED).build());
+
+ final boolean open =
apiClient.openConnectionForReceive(transactionUri, peer);
+ assertTrue(open);
+
+ final CommunicationsInput input = session.getInput();
+ try (InputStream inputStream = input.getInputStream()) {
+ final int read = inputStream.read();
+ assertEquals(-1, read);
+ }
+
+ final String transitUri = session.createTransitUri(null, null);
+ final String expectedTransitUri =
FLOW_FILES_PATH_FORMAT.formatted(receiveTransactionUri);
+ assertEquals(expectedTransitUri, transitUri);
+
+ final RecordedRequest request = server.takeRequest(TIMEOUT,
TimeUnit.SECONDS);
+ assertNotNull(request);
+ assertEquals(GET_METHOD, request.getMethod());
+ assertProtocolVersionFound(request);
+ }
+
+ @Test
+ void testOpenConnectionForSend() throws IOException, InterruptedException {
+ final String serverUrl = getServerUrl();
+ apiClient.setBaseUrl(serverUrl);
+
+ final String sendTransactionUri =
SEND_TRANSACTION_FORMAT.formatted(serverUrl);
+ final String transactionUri = initiateTransaction(sendTransactionUri,
TransferDirection.SEND);
+
+ final PeerDescription peerDescription = new
PeerDescription(server.getHostName(), server.getPort(), false);
+ final HttpCommunicationsSession session = new
HttpCommunicationsSession();
+ final Peer peer = new Peer(peerDescription, session, serverUrl,
serverUrl);
+
+ server.enqueue(new
MockResponse.Builder().code(HTTP_ACCEPTED).body(CHECKSUM).build());
+
+ apiClient.openConnectionForSend(transactionUri, peer);
+
+ // Finish Transfer to complete Connection
+ apiClient.finishTransferFlowFiles(session);
+
+ assertEquals(CHECKSUM, session.getChecksum());
+ final byte[] inputBytes =
session.getInput().getInputStream().readAllBytes();
+ assertEquals(CHECKSUM, new String(inputBytes, StandardCharsets.UTF_8));
+
+ final RecordedRequest request = server.takeRequest(TIMEOUT,
TimeUnit.SECONDS);
+ assertNotNull(request);
+ assertEquals(POST_METHOD, request.getMethod());
+ assertProtocolVersionFound(request);
+
+ final Headers headers = request.getHeaders();
+ final String contentType = headers.get(CONTENT_TYPE_HEADER);
+ assertEquals(APPLICATION_OCTET_STREAM, contentType);
+ final String accept = headers.get(ACCEPT_HEADER);
+ assertEquals(TEXT_PLAIN, accept);
+ }
+
+ @Test
+ void testFinishTransferFlowFilesNotStarted() throws IOException {
+ try (CommunicationsSession session =
mock(CommunicationsSession.class)) {
+ assertThrows(IllegalStateException.class, () ->
apiClient.finishTransferFlowFiles(session));
+ }
+ }
+
+ private String initiateTransaction(final String serverUrl, final
TransferDirection transferDirection) throws IOException, InterruptedException {
+ enqueueTransaction(serverUrl);
+
+ final String transactionUri =
apiClient.initiateTransaction(transferDirection, PORT_ID);
+
+ assertEquals(serverUrl, transactionUri);
+
+ final String expectedPath;
+ if (TransferDirection.SEND == transferDirection) {
+ expectedPath = SEND_TRANSACTION_PATH;
+ } else {
+ expectedPath = RECEIVE_TRANSACTION_PATH;
+ }
+
+ final RecordedRequest request = assertRequestRecorded(expectedPath);
+ assertEquals(POST_METHOD, request.getMethod());
+ assertProtocolVersionFound(request);
+
+ return transactionUri;
+ }
+
+ private void enqueueTransaction(final String serverUrl) {
+ server.enqueue(new MockResponse.Builder()
+ .code(HTTP_CREATED)
+ .setHeader(HttpHeaders.LOCATION_URI_INTENT_NAME,
HttpHeaders.LOCATION_URI_INTENT_VALUE)
+ .setHeader(HttpHeaders.LOCATION_HEADER_NAME, serverUrl)
+ .setHeader(HttpHeaders.PROTOCOL_VERSION,
FIRST_PROTOCOL_VERSION)
+ .setHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL,
Integer.toString(TRANSACTION_TTL))
+ .build()
+ );
+ }
+
+ private void enqueueController() throws JsonProcessingException {
+ final ControllerDTO expected = new ControllerDTO();
+ expected.setId(ControllerDTO.class.getName());
+ final ControllerEntity entity = new ControllerEntity();
+ entity.setController(expected);
+
+ enqueueResponseBody(entity);
+ }
+
+ private void enqueueResponseBody(final Object responseBody) throws
JsonProcessingException {
+ final String body = objectMapper.writeValueAsString(responseBody);
+
+ server.enqueue(new
MockResponse.Builder().code(HTTP_OK).body(body).build());
+ }
+
+ private String getServerUrl() {
+ final HttpUrl url = server.url(NIFI_API_PATH);
+ return url.toString();
+ }
+
+ private void assertProtocolVersionFound(final RecordedRequest request) {
+ final Headers headers = request.getHeaders();
+ final String protocolVersion =
headers.get(HttpHeaders.PROTOCOL_VERSION);
+ assertEquals(FIRST_PROTOCOL_VERSION, protocolVersion);
+ }
+
+ private void assertResponseCodeEquals(final RecordedRequest request, final
ResponseCode responseCode) {
+ final String responseCodeParameter =
request.getUrl().queryParameter(RESPONSE_CODE_PARAMETER);
+ assertNotNull(responseCodeParameter);
+
+ final int codeParameter = Integer.parseInt(responseCodeParameter);
+ assertEquals(responseCode.getCode(), codeParameter);
+ }
+
+ private RecordedRequest assertRequestRecorded(final String expectedPath)
throws InterruptedException {
+ final RecordedRequest request = server.takeRequest(TIMEOUT,
TimeUnit.SECONDS);
+ assertNotNull(request);
+
+ final Headers headers = request.getHeaders();
+ final String accept = headers.get(ACCEPT_HEADER);
+
+ final String method = request.getMethod();
+ if (POST_METHOD.equals(method)) {
+ assertEquals(APPLICATION_JSON, accept);
+ }
+
+ final HttpUrl url = request.getUrl();
+ final String encodedPath = url.encodedPath();
+
+ final String path = NIFI_API_PATH + expectedPath;
+ assertEquals(path, encodedPath);
+
+ return request;
+ }
+}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
index 9c566ae0e6..923908cef7 100644
---
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
@@ -21,7 +21,7 @@ import org.junit.jupiter.api.Test;
import java.util.Iterator;
import java.util.Set;
-import static
org.apache.nifi.remote.util.SiteToSiteRestApiClient.parseClusterUrls;
+import static org.apache.nifi.remote.util.ClusterUrlParser.parseClusterUrls;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git
a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
index c776245ae1..2e49467252 100644
---
a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
+++
b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
@@ -35,7 +35,7 @@ import org.apache.nifi.remote.SiteToSiteEventReporter;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
-import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.remote.util.ClusterUrlParser;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.ssl.SSLContextProvider;
@@ -155,7 +155,7 @@ public class SiteToSiteUtils {
stateManager = ((ReportingContext)
reportContext).getStateManager();
}
return new SiteToSiteClient.Builder()
- .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
+ .urls(ClusterUrlParser.parseClusterUrls(destinationUrl))
.portName(reportContext.getProperty(SiteToSiteUtils.PORT_NAME).getValue())
.useCompression(reportContext.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
.eventReporter(eventReporter)
@@ -172,7 +172,7 @@ public class SiteToSiteUtils {
public ValidationResult validate(final String subject, final String
input, final ValidationContext context) {
final String value =
context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
try {
- SiteToSiteRestApiClient.parseClusterUrls(value);
+ ClusterUrlParser.parseClusterUrls(value);
return new ValidationResult.Builder()
.input(input)
.subject(subject)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 1fe34b17fb..971577bb16 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.IOException;
-import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
@@ -91,10 +90,6 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
private static final Logger logger =
LoggerFactory.getLogger(StandardRemoteProcessGroup.class);
- // status codes
- private static final int UNAUTHORIZED_STATUS_CODE =
HttpURLConnection.HTTP_UNAUTHORIZED;
- private static final int FORBIDDEN_STATUS_CODE =
HttpURLConnection.HTTP_FORBIDDEN;
-
private final String id;
private volatile String targetUris;
@@ -413,7 +408,18 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
@Override
public String getTargetUri() {
- return SiteToSiteRestApiClient.getFirstUrl(targetUris);
+ final String targetUri;
+ if (targetUris == null) {
+ targetUri = null;
+ } else {
+ final int commaIndex = targetUris.indexOf(',');
+ if (commaIndex > -1) {
+ targetUri = targetUris.substring(0, commaIndex);
+ } else {
+ targetUri = targetUris;
+ }
+ }
+ return targetUri;
}
@Override
@@ -1256,36 +1262,24 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
}
try (final SiteToSiteRestApiClient apiClient =
getSiteToSiteRestApiClient()) {
- try {
- final ControllerDTO dto =
apiClient.getController(targetUris);
-
- if (dto.getRemoteSiteListeningPort() == null &&
SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) {
- authorizationIssue = "Remote instance is not
configured to allow RAW Site-to-Site communications at this time.";
- } else if (dto.getRemoteSiteHttpListeningPort() == null &&
SiteToSiteTransportProtocol.HTTP.equals(transportProtocol)) {
- authorizationIssue = "Remote instance is not
configured to allow HTTP Site-to-Site communications at this time.";
- } else {
- authorizationIssue = null;
- }
+ final ControllerDTO dto = apiClient.getController(targetUris);
- writeLock.lock();
- try {
- listeningPort = dto.getRemoteSiteListeningPort();
- listeningHttpPort =
dto.getRemoteSiteHttpListeningPort();
- destinationSecure = dto.isSiteToSiteSecure();
- } finally {
- writeLock.unlock();
- }
- } catch (SiteToSiteRestApiClient.HttpGetFailedException e) {
- final int responseCode = e.getResponseCode();
- if (responseCode == UNAUTHORIZED_STATUS_CODE ||
responseCode == FORBIDDEN_STATUS_CODE) {
- authorizationIssue = e.getDescription();
- } else {
- final String message = e.getDescription();
- logger.warn("{} When communicating with remote
instance, got unexpected result. {}", this, message);
- authorizationIssue = "Unable to determine Site-to-Site
availability.";
- }
+ if (dto.getRemoteSiteListeningPort() == null &&
SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) {
+ authorizationIssue = "Remote instance is not configured to
allow RAW Site-to-Site communications at this time.";
+ } else if (dto.getRemoteSiteHttpListeningPort() == null &&
SiteToSiteTransportProtocol.HTTP.equals(transportProtocol)) {
+ authorizationIssue = "Remote instance is not configured to
allow HTTP Site-to-Site communications at this time.";
+ } else {
+ authorizationIssue = null;
}
+ writeLock.lock();
+ try {
+ listeningPort = dto.getRemoteSiteListeningPort();
+ listeningHttpPort = dto.getRemoteSiteHttpListeningPort();
+ destinationSecure = dto.isSiteToSiteSecure();
+ } finally {
+ writeLock.unlock();
+ }
} catch (final Exception e) {
logger.warn("Unable to connect to {} due to {}",
StandardRemoteProcessGroup.this, e.toString());
getEventReporter().reportEvent(Severity.WARNING, "Site to
Site", String.format("Unable to connect to %s due to %s",
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 0b47c280b7..1c091c9234 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -63,10 +63,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>c2-protocol-component-api</artifactId>
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
index 2e3310ee99..9ff35fd068 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
@@ -17,7 +17,6 @@
package org.apache.nifi.controller.queue.clustered.server;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
@@ -27,7 +26,6 @@ import
org.apache.nifi.security.cert.StandardPeerIdentityProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
import java.io.IOException;
@@ -38,51 +36,66 @@ import java.util.stream.Collectors;
public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
private static final Logger logger =
LoggerFactory.getLogger(ClusterLoadBalanceAuthorizer.class);
+ private static final char WILDCARD = '*';
+
private final PeerIdentityProvider peerIdentityProvider = new
StandardPeerIdentityProvider();
private final ClusterCoordinator clusterCoordinator;
private final EventReporter eventReporter;
- private final HostnameVerifier hostnameVerifier;
public ClusterLoadBalanceAuthorizer(final ClusterCoordinator
clusterCoordinator, final EventReporter eventReporter) {
this.clusterCoordinator = clusterCoordinator;
this.eventReporter = eventReporter;
- this.hostnameVerifier = new DefaultHostnameVerifier();
}
@Override
- public String authorize(SSLSocket sslSocket) throws IOException {
+ public String authorize(final SSLSocket sslSocket) throws IOException {
final SSLSession sslSession = sslSocket.getSession();
final Certificate[] peerCertificates =
sslSession.getPeerCertificates();
final Set<String> clientIdentities =
peerIdentityProvider.getIdentities(peerCertificates);
- logger.debug("Will perform authorization against Client Identities
'{}'", clientIdentities);
-
final Set<String> nodeIds =
clusterCoordinator.getNodeIdentifiers().stream()
.map(NodeIdentifier::getApiAddress)
.collect(Collectors.toSet());
+ logger.debug("Authorizing Peer {} against Cluster Nodes {}",
clientIdentities, nodeIds);
+
for (final String clientId : clientIdentities) {
if (nodeIds.contains(clientId)) {
- logger.debug("Client ID '{}' is in the list of Nodes in the
Cluster. Authorizing Client to Load Balance data", clientId);
+ logger.debug("Peer Certificate Identity [{}] authorized for
Load Balancing from Cluster Node Identifiers", clientId);
return clientId;
}
}
- // If there are no matches of Client IDs, try to verify it by
HostnameVerifier. In this way, we can support wildcard certificates.
for (final String nodeId : nodeIds) {
- if (hostnameVerifier.verify(nodeId, sslSession)) {
+ if (isNodeIdMatched(nodeId, clientIdentities)) {
final String clientId =
sslSocket.getInetAddress().getHostName();
- logger.debug("The request was verified with node '{}'. The
hostname derived from the socket is '{}'. Authorizing Client to Load Balance
data", nodeId, clientId);
+ logger.debug("Peer Socket Address [{}] for Node Identifier
[{}] authorized for Load Balancing from Certificate Wildcard", clientId,
nodeId);
return clientId;
}
}
- final String message = "Authorization failed for Client ID's to Load
Balance data because none of the ID's are known Cluster Node Identifiers";
-
+ final String message = "Peer Certificate Identities %s not authorized
for Load Balancing".formatted(clientIdentities);
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, "Load Balanced
Connections", message);
- throw new NotAuthorizedException("Client ID's " + clientIdentities + "
are not authorized to Load Balance data");
+ throw new NotAuthorizedException(message);
+ }
+
+ private boolean isNodeIdMatched(final String nodeId, final Set<String>
clientIdentities) {
+ boolean matched = false;
+
+ for (final String clientIdentity : clientIdentities) {
+ final int wildcardIndex = clientIdentity.indexOf(WILDCARD);
+ if (wildcardIndex == 0) {
+ final String clientIdentityDomain =
clientIdentity.substring(1);
+ if (nodeId.endsWith(clientIdentityDomain)) {
+ matched = true;
+ break;
+ }
+ }
+ }
+
+ return matched;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/ClusteredLoadBalanceAuthorizerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/ClusteredLoadBalanceAuthorizerTest.java
new file mode 100644
index 0000000000..4974f65a81
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/ClusteredLoadBalanceAuthorizerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue.clustered.server;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.security.cert.GeneralNameType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ClusteredLoadBalanceAuthorizerTest {
+ private static final String LOCALHOST_ADDRESS = "localhost.local";
+
+ private static final String WILDCARD_LOCAL_DOMAIN = "*.local";
+
+ private static final String DNS_ADDRESS = "nifi.apache.org";
+
+ @Mock
+ private ClusterCoordinator clusterCoordinator;
+
+ @Mock
+ private SSLSocket sslSocket;
+
+ @Mock
+ private SSLSession sslSession;
+
+ @Mock
+ private X509Certificate peerCertificate;
+
+ @Mock
+ private NodeIdentifier nodeIdentifier;
+
+ private ClusterLoadBalanceAuthorizer authorizer;
+
+ @BeforeEach
+ void setAuthorizer() {
+ authorizer = new ClusterLoadBalanceAuthorizer(clusterCoordinator,
EventReporter.NO_OP);
+ }
+
+ @Test
+ void testAuthorizeNotAuthorized() throws IOException {
+ when(sslSocket.getSession()).thenReturn(sslSession);
+ when(sslSession.getPeerCertificates()).thenReturn(new
Certificate[]{peerCertificate});
+
+ assertThrows(NotAuthorizedException.class, () ->
authorizer.authorize(sslSocket));
+ }
+
+ @Test
+ void testAuthorizeNodeIdentifierAuthorized() throws IOException,
CertificateParsingException {
+ when(sslSocket.getSession()).thenReturn(sslSession);
+ when(sslSession.getPeerCertificates()).thenReturn(new
Certificate[]{peerCertificate});
+
+ setPeerCertificate(LOCALHOST_ADDRESS);
+
+ when(nodeIdentifier.getApiAddress()).thenReturn(LOCALHOST_ADDRESS);
+
when(clusterCoordinator.getNodeIdentifiers()).thenReturn(Set.of(nodeIdentifier));
+
+ final String authorized = authorizer.authorize(sslSocket);
+ assertEquals(LOCALHOST_ADDRESS, authorized);
+ }
+
+ @Test
+ void testAuthorizeWildcardAuthorized() throws IOException,
CertificateParsingException {
+ when(sslSocket.getSession()).thenReturn(sslSession);
+ when(sslSession.getPeerCertificates()).thenReturn(new
Certificate[]{peerCertificate});
+ final InetAddress socketAddress = InetAddress.getLocalHost();
+ final String socketHostName = socketAddress.getHostName();
+ when(sslSocket.getInetAddress()).thenReturn(socketAddress);
+
+ setPeerCertificate(WILDCARD_LOCAL_DOMAIN);
+
+ when(nodeIdentifier.getApiAddress()).thenReturn(LOCALHOST_ADDRESS);
+
when(clusterCoordinator.getNodeIdentifiers()).thenReturn(Set.of(nodeIdentifier));
+
+ final String authorized = authorizer.authorize(sslSocket);
+ assertEquals(socketHostName, authorized);
+ }
+
+ @Test
+ void testAuthorizeWildcardNotAuthorized() throws IOException,
CertificateParsingException {
+ when(sslSocket.getSession()).thenReturn(sslSession);
+ when(sslSession.getPeerCertificates()).thenReturn(new
Certificate[]{peerCertificate});
+
+ setPeerCertificate(WILDCARD_LOCAL_DOMAIN);
+
+ when(nodeIdentifier.getApiAddress()).thenReturn(DNS_ADDRESS);
+
when(clusterCoordinator.getNodeIdentifiers()).thenReturn(Set.of(nodeIdentifier));
+
+ assertThrows(NotAuthorizedException.class, () ->
authorizer.authorize(sslSocket));
+ }
+
+ private void setPeerCertificate(final String name) throws
CertificateParsingException {
+ final List<?> subjectAlternativeName =
List.of(GeneralNameType.DNS_NAME.getNameType(), name);
+ final Collection<List<?>> names = List.of(subjectAlternativeName);
+ when(peerCertificate.getSubjectAlternativeNames()).thenReturn(names);
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
b/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index b19b1bbee6..88401b0fa6 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -43,7 +43,7 @@ import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.http.HttpProxy;
-import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.remote.util.ClusterUrlParser;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -162,7 +162,7 @@ public class StandardRemoteGroupPort extends
RemoteGroupPort {
final SiteToSiteEventReporter eventReporter = (severity, category,
message) -> remoteGroup.getEventReporter().reportEvent(severity, category,
message);
final SiteToSiteClient.Builder clientBuilder = new
SiteToSiteClient.Builder()
-
.urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris()))
+
.urls(ClusterUrlParser.parseClusterUrls(remoteGroup.getTargetUris()))
.portIdentifier(getTargetIdentifier())
.sslContext(sslContext)
.useCompression(isUseCompression())
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/pom.xml
index 1eac909991..85d54d28a6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/pom.xml
@@ -120,10 +120,6 @@
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-web</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 51bfc22068..bb21091b16 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -74,7 +74,7 @@ import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
-import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.remote.util.ClusterUrlParser;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
@@ -2054,7 +2054,7 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
// parse the uri to check if the uri is valid
final String targetUris =
remoteProcessGroupDTO.getTargetUris();
- SiteToSiteRestApiClient.parseClusterUrls(targetUris);
+ ClusterUrlParser.parseClusterUrls(targetUris);
// since the uri is valid, use it
remoteProcessGroupDTO.setTargetUris(targetUris);