This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4db5f05436 NIFI-15214 Refactored site-to-site-client using Java 
HttpClient
4db5f05436 is described below

commit 4db5f05436fe75ddc3441757338c2535e4d33214
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Nov 12 13:28:53 2025 -0600

    NIFI-15214 Refactored site-to-site-client using Java HttpClient
    
    - Refactored ClusterLoadBalanceAuthorizer wildcard certificate verification
    - Removed Apache HttpClient 4 from framework modules
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10522.
---
 nifi-commons/nifi-site-to-site-client/pom.xml      |   53 +-
 .../nifi/remote/protocol/http/HttpProxy.java       |   12 -
 .../apache/nifi/remote/util/ClusterUrlParser.java  |  140 +++
 .../nifi/remote/util/SiteToSiteRestApiClient.java  | 1211 ++++----------------
 .../remote/util/SiteToSiteRestApiClientTest.java   |  497 ++++++++
 .../remote/util/TestSiteToSiteRestApiClient.java   |    2 +-
 .../apache/nifi/reporting/s2s/SiteToSiteUtils.java |    6 +-
 .../nifi/remote/StandardRemoteProcessGroup.java    |   60 +-
 .../nifi-framework/nifi-framework-core/pom.xml     |    4 -
 .../server/ClusterLoadBalanceAuthorizer.java       |   41 +-
 .../server/ClusteredLoadBalanceAuthorizerTest.java |  131 +++
 .../nifi/remote/StandardRemoteGroupPort.java       |    4 +-
 .../nifi-framework/nifi-web/nifi-jetty/pom.xml     |    4 -
 .../apache/nifi/web/api/ProcessGroupResource.java  |    4 +-
 14 files changed, 1076 insertions(+), 1093 deletions(-)

diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml 
b/nifi-commons/nifi-site-to-site-client/pom.xml
index 8423c64997..c19b3bc8d7 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -24,10 +24,6 @@
 
     <artifactId>nifi-site-to-site-client</artifactId>
 
-    <properties>
-        
<httpclient.version>${org.apache.httpcomponents.httpclient.version}</httpclient.version>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -71,53 +67,22 @@
             <version>2.7.0-SNAPSHOT</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpcore-nio</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpcore</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>jakarta.xml.bind</groupId>
+            <artifactId>jakarta.xml.bind-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpasyncclient</artifactId>
-            <version>4.1.5</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>jakarta.xml.bind</groupId>
-            <artifactId>jakarta.xml.bind-api</artifactId>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver3</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
index 4c0ebe73b4..ebfbbe413c 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
@@ -16,9 +16,6 @@
  */
 package org.apache.nifi.remote.protocol.http;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHost;
-
 public class HttpProxy {
     private final String host;
     private final Integer port;
@@ -32,7 +29,6 @@ public class HttpProxy {
         this.password = password;
     }
 
-
     public String getHost() {
         return host;
     }
@@ -48,12 +44,4 @@ public class HttpProxy {
     public String getPassword() {
         return password;
     }
-
-    public HttpHost getHttpHost() {
-        if (StringUtils.isEmpty(host)) {
-            return null;
-        }
-        return new HttpHost(host, port == null ? 80 : port);
-    }
-
 }
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ClusterUrlParser.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ClusterUrlParser.java
new file mode 100644
index 0000000000..1426765833
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ClusterUrlParser.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Site-To-Site Cluster URL Parser
+ */
+public class ClusterUrlParser {
+    /**
+     * Parse the comma-separated URLs string for the remote NiFi instances.
+     *
+     * @return A set containing one or more URLs
+     * @throws IllegalArgumentException when it fails to parse the URLs string,
+     * URLs string contains multiple protocols (http and https mix),
+     * or none of URL is specified.
+     */
+    public static Set<String> parseClusterUrls(final String clusterUrls) {
+        final Set<String> urls = new LinkedHashSet<>();
+        if (clusterUrls != null && !clusterUrls.isEmpty()) {
+            Arrays.stream(clusterUrls.split(","))
+                    .map(String::trim)
+                    .filter(s -> !s.isEmpty())
+                    .forEach(s -> {
+                        validateUriString(s);
+                        urls.add(resolveBaseUrl(s).intern());
+                    });
+        }
+
+        if (urls.isEmpty()) {
+            throw new IllegalArgumentException("Cluster URL was not 
specified.");
+        }
+
+        final Predicate<String> isHttps = url -> 
url.toLowerCase().startsWith("https:");
+        if (urls.stream().anyMatch(isHttps) && 
urls.stream().anyMatch(isHttps.negate())) {
+            throw new IllegalArgumentException("Different protocols are used 
in the cluster URLs " + clusterUrls);
+        }
+
+        return Collections.unmodifiableSet(urls);
+    }
+
+    static String resolveBaseUrl(final String clusterUrl) {
+        Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null.");
+        final URI uri;
+        try {
+            uri = new URI(clusterUrl.trim());
+        } catch (final URISyntaxException e) {
+            throw new IllegalArgumentException("The specified URL is 
malformed: " + clusterUrl);
+        }
+
+        return resolveBaseUrl(uri);
+    }
+
+    private static void validateUriString(String s) {
+        // parse the uri
+        final URI uri;
+        try {
+            uri = URI.create(s);
+        } catch (final IllegalArgumentException e) {
+            throw new IllegalArgumentException("The specified remote process 
group URL is malformed: " + s);
+        }
+
+        // validate each part of the uri
+        if (uri.getScheme() == null || uri.getHost() == null) {
+            throw new IllegalArgumentException("The specified remote process 
group URL is malformed: " + s);
+        }
+
+        if (!(uri.getScheme().equalsIgnoreCase("http") || 
uri.getScheme().equalsIgnoreCase("https"))) {
+            throw new IllegalArgumentException("The specified remote process 
group URL is invalid because it is not http or https: " + s);
+        }
+    }
+
+    /**
+     * Resolve NiFi API url with leniency. This method does following 
conversion on uri path:
+     * <ul>
+     * <li>/ to /nifi-api</li>
+     * <li>/nifi to /nifi-api</li>
+     * <li>/some/path/ to /some/path/nifi-api</li>
+     * </ul>
+     * @param clusterUrl url to be resolved
+     * @return resolved url
+     */
+    private static String resolveBaseUrl(final URI clusterUrl) {
+
+        if (clusterUrl.getScheme() == null || clusterUrl.getHost() == null) {
+            throw new IllegalArgumentException("The specified URL is 
malformed: " + clusterUrl);
+        }
+
+        if (!(clusterUrl.getScheme().equalsIgnoreCase("http") || 
clusterUrl.getScheme().equalsIgnoreCase("https"))) {
+            throw new IllegalArgumentException("The specified URL is invalid 
because it is not http or https: " + clusterUrl);
+        }
+
+
+        String uriPath = clusterUrl.getPath().trim();
+
+        if (StringUtils.isEmpty(uriPath) || uriPath.equals("/")) {
+            uriPath = "/nifi";
+        } else if (uriPath.endsWith("/")) {
+            uriPath = uriPath.substring(0, uriPath.length() - 1);
+        }
+
+        final StringBuilder uriPathBuilder = new StringBuilder(uriPath);
+        if (uriPath.endsWith("/nifi")) {
+            uriPathBuilder.append("-api");
+        } else if (!uriPath.endsWith("/nifi-api")) {
+            uriPathBuilder.append("/nifi-api");
+        }
+
+        try {
+            final URI uri = new URI(clusterUrl.getScheme(), null, 
clusterUrl.getHost(), clusterUrl.getPort(), uriPathBuilder.toString(), null, 
null);
+            return uri.toString();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+}
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 523f22d8c3..10e6fb1a3b 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -27,79 +27,40 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.net.Authenticator;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.PasswordAuthentication;
+import java.net.ProxySelector;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
 import java.nio.charset.StandardCharsets;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
-import java.util.Arrays;
+import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
 import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.Header;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpException;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpInetConnection;
-import org.apache.http.HttpRequest;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpResponseInterceptor;
-import org.apache.http.StatusLine;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.AuthState;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.client.utils.URIUtils;
-import org.apache.http.conn.ManagedHttpClientConnection;
-import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
-import org.apache.http.nio.ContentEncoder;
-import org.apache.http.nio.IOControl;
-import org.apache.http.nio.conn.ManagedNHttpClientConnection;
-import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
-import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
-import org.apache.http.protocol.HttpContext;
-import org.apache.http.protocol.HttpCoreContext;
-import org.apache.http.util.EntityUtils;
 import org.apache.nifi.remote.SiteToSiteEventReporter;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.TransferDirection;
@@ -115,7 +76,6 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.protocol.http.HttpHeaders;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
-import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.security.cert.StandardPrincipalFormatter;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.web.api.dto.ControllerDTO;
@@ -126,6 +86,11 @@ import 
org.apache.nifi.web.api.entity.TransactionResultEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.net.HttpURLConnection.HTTP_ACCEPTED;
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_OK;
 import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
 import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
 import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
@@ -137,31 +102,22 @@ import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTE
 
 public class SiteToSiteRestApiClient implements Closeable {
 
-    private static final String EVENT_CATEGORY = "Site-to-Site";
+    private static final String ACCEPT_HEADER = "Accept";
+    private static final String APPLICATION_JSON = "application/json";
     private static final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384;
 
-    private static final int RESPONSE_CODE_OK = 200;
-    private static final int RESPONSE_CODE_CREATED = 201;
-    private static final int RESPONSE_CODE_ACCEPTED = 202;
-    private static final int RESPONSE_CODE_BAD_REQUEST = 400;
-    private static final int RESPONSE_CODE_FORBIDDEN = 403;
-    private static final int RESPONSE_CODE_NOT_FOUND = 404;
-
     private static final Logger logger = 
LoggerFactory.getLogger(SiteToSiteRestApiClient.class);
+    private static final ObjectMapper objectMapper = new 
ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
 
     private String baseUrl;
     protected final SSLContext sslContext;
     protected final HttpProxy proxy;
-    private final AtomicBoolean proxyAuthRequiresResend = new 
AtomicBoolean(false);
     private final SiteToSiteEventReporter eventReporter;
 
-    private RequestConfig requestConfig;
-    private CredentialsProvider credentialsProvider;
-    private CloseableHttpClient httpClient;
-    private CloseableHttpAsyncClient httpAsyncClient;
+    private final HttpClient.Builder httpClientBuilder;
+    private HttpClient httpClient;
 
     private boolean compress = false;
-    private InetAddress localAddress = null;
     private long requestExpirationMillis = 0;
     private int serverTransactionTtl = 0;
     private int batchCount = 0;
@@ -173,159 +129,63 @@ public class SiteToSiteRestApiClient implements 
Closeable {
     private final ScheduledExecutorService ttlExtendTaskExecutor;
     private ScheduledFuture<?> ttlExtendingFuture;
 
-    private int connectTimeoutMillis;
     private int readTimeoutMillis;
     private long cacheExpirationMillis = 30000L;
     private static final Pattern HTTP_ABS_URL = 
Pattern.compile("^https?://.+$");
 
-    private Future<HttpResponse> postResult;
-    private CountDownLatch transferDataLatch = new CountDownLatch(1);
+    private CompletableFuture<HttpResponse<String>> transactionFuture;
 
     private static final ConcurrentMap<String, RemoteGroupContents> 
contentsMap = new ConcurrentHashMap<>();
-    private volatile long lastPruneTimestamp = System.currentTimeMillis();
+    private final long lastPruneTimestamp = System.currentTimeMillis();
 
     public SiteToSiteRestApiClient(final SSLContext sslContext, final 
HttpProxy proxy, final SiteToSiteEventReporter eventReporter) {
         this.sslContext = sslContext;
         this.proxy = proxy;
         this.eventReporter = eventReporter;
 
-        ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1, new 
ThreadFactory() {
-            private final ThreadFactory defaultFactory = 
Executors.defaultThreadFactory();
+        final ThreadFactory threadFactory = 
Thread.ofVirtual().name(SiteToSiteRestApiClient.class.getSimpleName(), 
1).factory();
+        ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1, 
threadFactory);
 
-            @Override
-            public Thread newThread(final Runnable r) {
-                final Thread thread = defaultFactory.newThread(r);
-                thread.setName(Thread.currentThread().getName() + " 
Site-to-Site Extend Transactions");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+        httpClientBuilder = HttpClient.newBuilder();
+        if (sslContext != null) {
+            httpClientBuilder.sslContext(sslContext);
+        }
+        if (proxy != null && proxy.getPort() != null) {
+            final InetSocketAddress proxyAddress = new 
InetSocketAddress(proxy.getHost(), proxy.getPort());
+            final ProxySelector proxySelector = ProxySelector.of(proxyAddress);
+            httpClientBuilder.proxy(proxySelector);
+            httpClientBuilder.authenticator(getProxyAuthenticator());
+        }
+        httpClient = httpClientBuilder.build();
     }
 
     @Override
     public void close() throws IOException {
         stopExtendingTransaction();
-        closeSilently(httpClient);
-        closeSilently(httpAsyncClient);
-    }
-
-    private CloseableHttpClient getHttpClient() {
-        if (httpClient == null) {
-            setupClient();
-        }
-        return httpClient;
-    }
-
-    private CloseableHttpAsyncClient getHttpAsyncClient() {
-        if (httpAsyncClient == null) {
-            setupAsyncClient();
-        }
-        return httpAsyncClient;
-    }
-
-    private RequestConfig getRequestConfig() {
-        if (requestConfig == null) {
-            setupRequestConfig();
-        }
-        return requestConfig;
-    }
-
-    private CredentialsProvider getCredentialsProvider() {
-        if (credentialsProvider == null) {
-            setupCredentialsProvider();
-        }
-        return credentialsProvider;
-    }
-
-    private void setupRequestConfig() {
-        final RequestConfig.Builder requestConfigBuilder = 
RequestConfig.custom()
-            .setConnectionRequestTimeout(connectTimeoutMillis)
-            .setConnectTimeout(connectTimeoutMillis)
-            .setSocketTimeout(readTimeoutMillis);
-
-        if (localAddress != null) {
-            requestConfigBuilder.setLocalAddress(localAddress);
-        }
-
-        if (proxy != null) {
-            requestConfigBuilder.setProxy(proxy.getHttpHost());
-        }
-
-        requestConfig = requestConfigBuilder.build();
-    }
-
-    private void setupCredentialsProvider() {
-        credentialsProvider = new BasicCredentialsProvider();
-        if (proxy != null) {
-            if (StringUtils.isNotEmpty(proxy.getUsername()) && 
StringUtils.isNotEmpty(proxy.getPassword())) {
-                credentialsProvider.setCredentials(
-                    new AuthScope(proxy.getHttpHost()),
-                    new UsernamePasswordCredentials(proxy.getUsername(), 
proxy.getPassword()));
-            }
-
-        }
-    }
-
-    private void setupClient() {
-        final HttpClientBuilder clientBuilder = HttpClients.custom();
-
-        if (sslContext != null) {
-            clientBuilder.setSSLContext(sslContext);
-            clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor());
-        }
-
-        httpClient = clientBuilder
-            .setDefaultCredentialsProvider(getCredentialsProvider()).build();
-    }
-
-    private void setupAsyncClient() {
-        final HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
-
-        if (sslContext != null) {
-            clientBuilder.setSSLContext(sslContext);
-            clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor());
-        }
-
-        httpAsyncClient = 
clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider()).build();
-        httpAsyncClient.start();
+        httpClient.shutdown();
+        httpClient.close();
     }
 
-    private class HttpsResponseInterceptor implements HttpResponseInterceptor {
-        @Override
-        public void process(final HttpResponse response, final HttpContext 
httpContext) throws HttpException, IOException {
-            final HttpCoreContext coreContext = 
HttpCoreContext.adapt(httpContext);
-            final HttpInetConnection conn = 
coreContext.getConnection(HttpInetConnection.class);
-            if (!conn.isOpen()) {
-                return;
-            }
+    private Authenticator getProxyAuthenticator() {
+        final String username = proxy.getUsername();
+        final char[] password = proxy.getPassword().toCharArray();
 
-            final SSLSession sslSession;
-            if (conn instanceof ManagedHttpClientConnection) {
-                sslSession = ((ManagedHttpClientConnection) 
conn).getSSLSession();
-            } else if (conn instanceof ManagedNHttpClientConnection) {
-                sslSession = ((ManagedNHttpClientConnection) 
conn).getSSLSession();
-            } else {
-                throw new RuntimeException("Unexpected connection type was 
used, " + conn);
-            }
+        return new Authenticator() {
+            private final PasswordAuthentication authentication = new 
PasswordAuthentication(username, password);
 
+            @Override
+            protected PasswordAuthentication getPasswordAuthentication() {
+                final PasswordAuthentication passwordAuthentication;
 
-            if (sslSession != null) {
-                final Certificate[] certChain = 
sslSession.getPeerCertificates();
-                if (certChain == null || certChain.length == 0) {
-                    throw new SSLPeerUnverifiedException("No certificates 
found");
+                if (RequestorType.PROXY == getRequestorType()) {
+                    passwordAuthentication = authentication;
+                } else {
+                    passwordAuthentication = null;
                 }
 
-                try {
-                    final X509Certificate cert = (X509Certificate) 
certChain[0];
-                    trustedPeerDn = 
StandardPrincipalFormatter.getInstance().getSubject(cert);
-                } catch (final RuntimeException e) {
-                    final String msg = "Could not extract subject DN from SSL 
session peer certificate";
-                    logger.warn(msg);
-                    eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY, msg);
-                    throw new SSLPeerUnverifiedException(msg);
-                }
+                return passwordAuthentication;
             }
-        }
+        };
     }
 
     /**
@@ -338,7 +198,7 @@ public class SiteToSiteRestApiClient implements Closeable {
      * or none of URL is specified.
      */
     public ControllerDTO getController(final String clusterUrls) throws 
IOException {
-        return getController(parseClusterUrls(clusterUrls));
+        return getController(ClusterUrlParser.parseClusterUrls(clusterUrls));
     }
 
     /**
@@ -347,19 +207,15 @@ public class SiteToSiteRestApiClient implements Closeable 
{
      * After this method execution, the base URL is set with the successful 
URL.
      */
     public ControllerDTO getController(final Set<String> clusterUrls) throws 
IOException {
-
-        IOException lastException = null;
+        IOException lastException = new IOException("Get Controller failed");
         for (final String clusterUrl : clusterUrls) {
             // The url may not be normalized if it passed directly without 
parsed with parseClusterUrls.
-            setBaseUrl(resolveBaseUrl(clusterUrl));
+            setBaseUrl(ClusterUrlParser.resolveBaseUrl(clusterUrl));
             try {
                 return getController();
             } catch (IOException e) {
                 lastException = e;
                 logger.warn("Failed to get controller from {}", clusterUrl, e);
-                if (logger.isDebugEnabled()) {
-                    logger.debug("", e);
-                }
             }
         }
 
@@ -386,7 +242,8 @@ public class SiteToSiteRestApiClient implements Closeable {
 
                 final ControllerDTO refreshedContents;
                 try {
-                    refreshedContents = fetchController();
+                    final HttpRequest.Builder requestBuilder = 
HttpRequest.newBuilder(getUri("/site-to-site"));
+                    refreshedContents = send(requestBuilder, 
ControllerEntity.class).getController();
                 } catch (final Exception e) {
                     // we failed to refresh contents, but we don't want to 
constantly poll the remote instance, failing.
                     // So we put the ControllerDTO back but use a new 
RemoteGroupContents so that we get a new timestamp.
@@ -408,20 +265,6 @@ public class SiteToSiteRestApiClient implements Closeable {
         }
     }
 
-    private ControllerDTO fetchController() throws IOException {
-        try {
-            final HttpGet get = createGetControllerRequest();
-            return execute(get, ControllerEntity.class).getController();
-        } catch (final HttpGetFailedException e) {
-            if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) {
-                logger.debug("getController received NOT_FOUND, trying to 
access the old NiFi version resource url...");
-                final HttpGet get = createGet("/controller");
-                return execute(get, ControllerEntity.class).getController();
-            }
-            throw e;
-        }
-    }
-
     private void pruneCache() {
         for (final Map.Entry<String, RemoteGroupContents> entry : 
contentsMap.entrySet()) {
             final String url = entry.getKey();
@@ -436,234 +279,61 @@ public class SiteToSiteRestApiClient implements 
Closeable {
         }
     }
 
-    private HttpGet createGetControllerRequest() {
-        final HttpGet get = createGet("/site-to-site");
-        get.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-        return get;
-    }
-
     public Collection<PeerDTO> getPeers() throws IOException {
-        final HttpGet get = createGet("/site-to-site/peers");
-        get.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-        return execute(get, PeersEntity.class).getPeers();
+        final HttpRequest.Builder requestBuilder = 
HttpRequest.newBuilder(getUri("/site-to-site/peers"));
+        return send(requestBuilder, PeersEntity.class).getPeers();
     }
 
     public String initiateTransaction(final TransferDirection direction, final 
String portId) throws IOException {
-
         final String portType = TransferDirection.RECEIVE.equals(direction) ? 
"output-ports" : "input-ports";
         logger.debug("initiateTransaction handshaking portType={}, portId={}", 
portType, portId);
 
-        final HttpPost post = createPost("/data-transfer/" + portType + "/" + 
portId + "/transactions");
+        final URI uri = getUri("/data-transfer/" + portType + "/" + portId + 
"/transactions");
+        final HttpRequest.Builder requestBuilder = 
HttpRequest.newBuilder(uri).POST(HttpRequest.BodyPublishers.noBody());
+        requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
 
-        post.setHeader("Accept", "application/json");
-        post.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-
-        setHandshakeProperties(post);
-
-        final HttpResponse response;
+        final HttpResponse<InputStream> response;
         if (TransferDirection.RECEIVE.equals(direction)) {
-            response = initiateTransactionForReceive(post);
+            response = sendRequest(requestBuilder);
         } else {
-            response = initiateTransactionForSend(post);
+            if (shouldCheckProxyAuth()) {
+                getController();
+            }
+            response = sendRequest(requestBuilder);
         }
 
-        final int responseCode = response.getStatusLine().getStatusCode();
+        final int responseCode = response.statusCode();
         logger.debug("initiateTransaction responseCode={}", responseCode);
 
         String transactionUrl;
-        if (responseCode == RESPONSE_CODE_CREATED) {
-            EntityUtils.consume(response.getEntity());
+        if (responseCode == HTTP_CREATED) {
+            response.body().close();
 
             transactionUrl = readTransactionUrl(response);
             if (StringUtils.isEmpty(transactionUrl)) {
                 throw new ProtocolException("Server returned 
RESPONSE_CODE_CREATED without Location header");
             }
-            final Header transportProtocolVersionHeader = 
response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
-            if (transportProtocolVersionHeader == null) {
-                throw new ProtocolException("Server didn't return confirmed 
protocol version");
+            final Optional<String> transportProtocolVersionHeader = 
response.headers().firstValue(HttpHeaders.PROTOCOL_VERSION);
+            if (transportProtocolVersionHeader.isEmpty()) {
+                throw new ProtocolException("Transport Protocol Response 
Header not found");
             }
-            final int protocolVersionConfirmedByServer = 
Integer.parseInt(transportProtocolVersionHeader.getValue());
+            final int protocolVersionConfirmedByServer = 
Integer.parseInt(transportProtocolVersionHeader.get());
             logger.debug("Finished version negotiation, 
protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
             
transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
 
-            final Header serverTransactionTtlHeader = 
response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
-            if (serverTransactionTtlHeader == null) {
-                throw new ProtocolException("Server didn't return " + 
HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
+            final Optional<String> serverTransactionTtlHeader = 
response.headers().firstValue(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
+            if (serverTransactionTtlHeader.isEmpty()) {
+                throw new ProtocolException("Transaction TTL Response Header 
not found");
             }
-            serverTransactionTtl = 
Integer.parseInt(serverTransactionTtlHeader.getValue());
+            serverTransactionTtl = 
Integer.parseInt(serverTransactionTtlHeader.get());
         } else {
-            try (InputStream content = response.getEntity().getContent()) {
+            try (InputStream content = response.body()) {
                 throw handleErrResponse(responseCode, content);
             }
         }
 
         logger.debug("initiateTransaction handshaking finished, 
transactionUrl={}", transactionUrl);
         return transactionUrl;
-
-    }
-
-    /**
-     * Initiate a transaction for receiving data.
-     * @param post a POST request to establish transaction
-     * @return POST request response
-     * @throws IOException thrown if the post request failed
-     */
-    private HttpResponse initiateTransactionForReceive(final HttpPost post) 
throws IOException {
-        return getHttpClient().execute(post);
-    }
-
-    /**
-     * <p>
-     * Initiate a transaction for sending data.
-     * </p>
-     *
-     * <p>
-     * If a proxy server requires auth, the proxy server returns 407 response 
with available auth schema such as basic or digest.
-     * Then client has to resend the same request with its credential added.
-     * This mechanism is problematic for sending data from NiFi.
-     * </p>
-     *
-     * <p>
-     * In order to resend a POST request with auth param,
-     * NiFi has to either read flow-file contents to send again, or keep the 
POST body somewhere.
-     * If we store that in memory, it would causes OOM, or storing it on disk 
slows down performance.
-     * Rolling back processing session would be overkill.
-     * Reading flow-file contents only when it's ready to send in a streaming 
way is ideal.
-     * </p>
-     *
-     * <p>
-     * Additionally, the way proxy authentication is done is vary among Proxy 
server software.
-     * Some requires 407 and resend cycle for every requests, while others 
keep a connection between a client and
-     * the proxy server, then consecutive requests skip auth steps.
-     * The problem is, that how should we behave is only told after sending a 
request to the proxy.
-     * </p>
-     *
-     * In order to handle above concerns correctly and efficiently, this 
method do the followings:
-     *
-     * <ol>
-     * <li>Send a GET request to controller resource, to initiate an 
HttpAsyncClient. The instance will be used for further requests.
-     *      This is not required by the Site-to-Site protocol, but it can 
setup proxy auth state safely.</li>
-     * <li>Send a POST request to initiate a transaction. While doing so, it 
captures how a proxy server works.
-     * If 407 and resend cycle occurs here, it implies that we need to do the 
same thing again when we actually send the data.
-     * Because if the proxy keeps using the same connection and doesn't 
require an auth step, it doesn't do so here.</li>
-     * <li>Then this method stores whether the final POST request should wait 
for the auth step.
-     * So that {@link #openConnectionForSend} can determine when to produce 
contents.</li>
-     * </ol>
-     *
-     * <p>
-     * The above special sequence is only executed when a proxy instance is 
set, and its username is set.
-     * </p>
-     *
-     * @param post a POST request to establish transaction
-     * @return POST request response
-     * @throws IOException thrown if the post request failed
-     */
-    private HttpResponse initiateTransactionForSend(final HttpPost post) 
throws IOException {
-        if (shouldCheckProxyAuth()) {
-            final CloseableHttpAsyncClient asyncClient = getHttpAsyncClient();
-            final HttpGet get = createGetControllerRequest();
-            final Future<HttpResponse> getResult = asyncClient.execute(get, 
null);
-            try {
-                final HttpResponse getResponse = 
getResult.get(readTimeoutMillis, TimeUnit.MILLISECONDS);
-                logger.debug("Proxy auth check has done. getResponse={}", 
getResponse.getStatusLine());
-            } catch (final ExecutionException e) {
-                logger.debug("Something has happened at get controller 
requesting thread for proxy auth check. {}", e.getMessage());
-                throw toIOException(e);
-            } catch (TimeoutException | InterruptedException e) {
-                throw new IOException(e);
-            }
-        }
-
-        final HttpAsyncRequestProducer asyncRequestProducer = new 
HttpAsyncRequestProducer() {
-            private boolean requestHasBeenReset = false;
-
-            @Override
-            public HttpHost getTarget() {
-                return URIUtils.extractHost(post.getURI());
-            }
-
-            @Override
-            public HttpRequest generateRequest() {
-                final BasicHttpEntity entity = new BasicHttpEntity();
-                post.setEntity(entity);
-                return post;
-            }
-
-            @Override
-            public void produceContent(ContentEncoder encoder, IOControl 
ioctrl) throws IOException {
-                encoder.complete();
-                if (shouldCheckProxyAuth() && requestHasBeenReset) {
-                    logger.debug("Produced content again, assuming the proxy 
server requires authentication.");
-                    proxyAuthRequiresResend.set(true);
-                }
-            }
-
-            @Override
-            public void requestCompleted(HttpContext context) {
-                debugProxyAuthState(context);
-            }
-
-            @Override
-            public void failed(Exception ex) {
-                final String msg = String.format("Failed to create transaction 
for %s", post.getURI());
-                logger.error(msg, ex);
-                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
msg);
-            }
-
-            @Override
-            public boolean isRepeatable() {
-                return true;
-            }
-
-            @Override
-            public void resetRequest() {
-                requestHasBeenReset = true;
-            }
-
-            @Override
-            public void close() {
-            }
-        };
-
-        final Future<HttpResponse> responseFuture = 
getHttpAsyncClient().execute(asyncRequestProducer, new 
BasicAsyncResponseConsumer(), null);
-        final HttpResponse response;
-        try {
-            response = responseFuture.get(readTimeoutMillis, 
TimeUnit.MILLISECONDS);
-
-        } catch (final ExecutionException e) {
-            logger.debug("Something has happened at initiate transaction 
requesting thread. {}", e.getMessage());
-            throw toIOException(e);
-        } catch (TimeoutException | InterruptedException e) {
-            throw new IOException(e);
-        }
-        return response;
-    }
-
-    /**
-     * Print AuthState in HttpContext for debugging purpose.
-     * <p>
-     * If the proxy server requires 407 and resend cycle, this method logs as 
followings, for Basic Auth:
-     * <ul><li>state:UNCHALLENGED;</li>
-     * <li>state:CHALLENGED;auth scheme:basic;credentials present</li></ul>
-     * </p>
-     * <p>
-     * For Digest Auth:
-     * <ul><li>state:UNCHALLENGED;</li>
-     * <li>state:CHALLENGED;auth scheme:digest;credentials present</li></ul>
-     * </p>
-     * <p>
-     * But if the proxy uses the same connection, it doesn't return 407, in 
such case
-     * this method is called only once with:
-     * <ul><li>state:UNCHALLENGED</li></ul>
-     * </p>
-     */
-    private void debugProxyAuthState(HttpContext context) {
-        final AuthState proxyAuthState;
-        if (shouldCheckProxyAuth()
-                && logger.isDebugEnabled()
-                && (proxyAuthState = (AuthState) 
context.getAttribute("http.auth.proxy-scope")) != null) {
-            logger.debug("authProxyScope={}", proxyAuthState);
-        }
     }
 
     private IOException toIOException(ExecutionException e) {
@@ -680,293 +350,113 @@ public class SiteToSiteRestApiClient implements 
Closeable {
     }
 
     public boolean openConnectionForReceive(final String transactionUrl, final 
Peer peer) throws IOException {
-
-        final HttpGet get = createGet(transactionUrl + "/flow-files");
-        // Set uri so that it'll be used as transit uri.
-        ((HttpCommunicationsSession) 
peer.getCommunicationsSession()).setDataTransferUrl(get.getURI().toString());
-
-        get.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-
-        setHandshakeProperties(get);
-
-        final CloseableHttpResponse response = getHttpClient().execute(get);
-        final int responseCode = response.getStatusLine().getStatusCode();
-        logger.debug("responseCode={}", responseCode);
-
-        boolean keepItOpen = false;
-        try {
-            switch (responseCode) {
-                case RESPONSE_CODE_OK:
-                    logger.debug("Server returned RESPONSE_CODE_OK, indicating 
there was no data.");
-                    EntityUtils.consume(response.getEntity());
-                    return false;
-
-                case RESPONSE_CODE_ACCEPTED:
-                    final InputStream httpIn = 
response.getEntity().getContent();
-                    final InputStream streamCapture = new InputStream() {
-                        boolean closed = false;
-
-                        @Override
-                        public int read() throws IOException {
-                            if (closed) {
-                                return -1;
-                            }
-                            final int r = httpIn.read();
-                            if (r < 0) {
-                                closed = true;
-                                logger.debug("Reached to end of input stream. 
Closing resources...");
-                                stopExtendingTransaction();
-                                closeSilently(httpIn);
-                                closeSilently(response);
-                            }
-                            return r;
+        final URI uri = getUri(transactionUrl + "/flow-files");
+        final HttpCommunicationsSession session = (HttpCommunicationsSession) 
peer.getCommunicationsSession();
+        session.setDataTransferUrl(uri.toString());
+
+        final HttpRequest.Builder requestBuilder = 
HttpRequest.newBuilder(uri).GET();
+        requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
+
+        final HttpResponse<InputStream> response = sendRequest(requestBuilder);
+        final int responseCode = response.statusCode();
+        switch (responseCode) {
+            case HTTP_OK:
+                logger.debug("Server returned RESPONSE_CODE_OK, indicating 
there was no data.");
+                response.body().close();
+                return false;
+            case HTTP_ACCEPTED:
+                final InputStream httpIn = response.body();
+                final InputStream streamCapture = new InputStream() {
+                    boolean closed = false;
+                    @Override
+                    public int read() throws IOException {
+                        if (closed) {
+                            return -1;
                         }
-                    };
-                    ((HttpInput) 
peer.getCommunicationsSession().getInput()).setInputStream(streamCapture);
-
-                    startExtendingTransaction(transactionUrl);
-                    keepItOpen = true;
-                    return true;
-
-                default:
-                    try (InputStream content = 
response.getEntity().getContent()) {
-                        throw handleErrResponse(responseCode, content);
+                        final int r = httpIn.read();
+                        if (r < 0) {
+                            closed = true;
+                            logger.debug("Reached to end of input stream. 
Closing resources...");
+                            stopExtendingTransaction();
+                            closeSilently(httpIn);
+                        }
+                        return r;
                     }
-            }
-        } finally {
-            if (!keepItOpen) {
-                response.close();
-            }
+                };
+                ((HttpInput) session.getInput()).setInputStream(streamCapture);
+                startExtendingTransaction(transactionUrl);
+                return true;
+            default:
+                try (InputStream content = response.body()) {
+                    throw handleErrResponse(responseCode, content);
+                }
         }
     }
 
-
     public void openConnectionForSend(final String transactionUrl, final Peer 
peer) throws IOException {
-
-        final CommunicationsSession commSession = 
peer.getCommunicationsSession();
         final String flowFilesPath = transactionUrl + "/flow-files";
-        final HttpPost post = createPost(flowFilesPath);
-        // Set uri so that it'll be used as transit uri.
-        ((HttpCommunicationsSession) 
peer.getCommunicationsSession()).setDataTransferUrl(post.getURI().toString());
+        final URI uri = getUri(flowFilesPath);
+        final HttpCommunicationsSession session = (HttpCommunicationsSession) 
peer.getCommunicationsSession();
+        session.setDataTransferUrl(uri.toString());
 
-        post.setHeader("Content-Type", "application/octet-stream");
-        post.setHeader("Accept", "text/plain");
-        post.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+        final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(uri);
+        requestBuilder.setHeader("Content-Type", "application/octet-stream");
+        requestBuilder.setHeader(ACCEPT_HEADER, "text/plain");
+        requestBuilder.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+        setRequestHeaders(requestBuilder);
 
-        setHandshakeProperties(post);
-
-        final CountDownLatch initConnectionLatch = new CountDownLatch(1);
-
-        final URI requestUri = post.getURI();
         final PipedOutputStream outputStream = new PipedOutputStream();
         final PipedInputStream inputStream = new 
PipedInputStream(outputStream, DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
-        final ReadableByteChannel dataPacketChannel = 
Channels.newChannel(inputStream);
-        final HttpAsyncRequestProducer asyncRequestProducer = new 
HttpAsyncRequestProducer() {
-
-            private final ByteBuffer buffer = 
ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
-
-            private long totalRead = 0;
-            private long totalProduced = 0;
-
-            private boolean requestHasBeenReset = false;
-
-
-            @Override
-            public HttpHost getTarget() {
-                return URIUtils.extractHost(requestUri);
-            }
-
-            @Override
-            public HttpRequest generateRequest() {
-
-                // Pass the output stream so that Site-to-Site client thread 
can send
-                // data packet through this connection.
-                logger.debug("sending data to {} has started...", 
flowFilesPath);
-                ((HttpOutput) 
commSession.getOutput()).setOutputStream(outputStream);
-                initConnectionLatch.countDown();
-
-                final BasicHttpEntity entity = new BasicHttpEntity();
-                entity.setChunked(true);
-                entity.setContentType("application/octet-stream");
-                post.setEntity(entity);
-                return post;
-            }
-
-            private final AtomicBoolean bufferHasRemainingData = new 
AtomicBoolean(false);
-
-            /**
-             * If the proxy server requires authentication, the same POST 
request has to be sent again.
-             * The first request will result 407, then the next one will be 
sent with auth headers and actual data.
-             * This method produces a content only when it's need to be sent, 
to avoid producing the flow-file contents twice.
-             * Whether we need to wait auth is determined heuristically by the 
previous POST request which creates transaction.
-             * See {@link 
SiteToSiteRestApiClient#initiateTransactionForSend(HttpPost)} for further 
detail.
-             */
-            @Override
-            public void produceContent(final ContentEncoder encoder, final 
IOControl ioControl) throws IOException {
-
-                if (shouldCheckProxyAuth() && proxyAuthRequiresResend.get() && 
!requestHasBeenReset) {
-                    logger.debug("Need authentication with proxy server. 
Postpone producing content.");
-                    encoder.complete();
-                    return;
-                }
-
-                if (bufferHasRemainingData.get()) {
-                    // If there's remaining buffer last time, send it first.
-                    writeBuffer(encoder);
-                    if (bufferHasRemainingData.get()) {
-                        return;
-                    }
-                }
-
-                int read;
-                // This read() blocks until data becomes available,
-                // or corresponding outputStream is closed.
-                if ((read = dataPacketChannel.read(buffer)) > -1) {
-
-                    logger.trace("Read {} bytes from dataPacketChannel. {}", 
read, flowFilesPath);
-                    totalRead += read;
-
-                    buffer.flip();
-                    writeBuffer(encoder);
-
-                } else {
-
-                    final long totalWritten = 
commSession.getOutput().getBytesWritten();
-                    logger.debug("sending data to {} has reached to its end. 
produced {} bytes by reading {} bytes from channel. {} bytes written in this 
transaction.",
-                            flowFilesPath, totalProduced, totalRead, 
totalWritten);
-
-                    if (totalRead != totalWritten || totalProduced != 
totalWritten) {
-                        final String msg = "Sending data to %s has reached to 
its end, but produced : read : wrote byte sizes (%d : %d : %d) were not equal. 
Something went wrong.";
-                        throw new RuntimeException(String.format(msg, 
flowFilesPath, totalProduced, totalRead, totalWritten));
-                    }
-                    transferDataLatch.countDown();
-                    encoder.complete();
-                    dataPacketChannel.close();
-                }
-
-            }
-
-            private void writeBuffer(ContentEncoder encoder) throws 
IOException {
-                while (buffer.hasRemaining()) {
-                    final int written = encoder.write(buffer);
-                    logger.trace("written {} bytes to encoder.", written);
-                    if (written == 0) {
-                        logger.trace("Buffer still has remaining. {}", buffer);
-                        bufferHasRemainingData.set(true);
-                        return;
-                    }
-                    totalProduced += written;
-                }
-                bufferHasRemainingData.set(false);
-                buffer.clear();
-            }
-
-            @Override
-            public void requestCompleted(final HttpContext context) {
-                logger.debug("Sending data to {} completed.", flowFilesPath);
-                debugProxyAuthState(context);
-            }
-
-            @Override
-            public void failed(final Exception ex) {
-                final String msg = String.format("Failed to send data to %s 
due to %s", flowFilesPath, ex.toString());
-                logger.error(msg, ex);
-                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
msg);
-            }
-
-            @Override
-            public boolean isRepeatable() {
-                // In order to pass authentication, request has to be 
repeatable.
-                return true;
-            }
-
-            @Override
-            public void resetRequest() {
-                logger.debug("Sending data request to {} has been reset...", 
flowFilesPath);
-                requestHasBeenReset = true;
-            }
-
-            @Override
-            public void close() {
-                logger.debug("Closing sending data request to {}", 
flowFilesPath);
-                closeSilently(outputStream);
-                closeSilently(dataPacketChannel);
-                stopExtendingTransaction();
-            }
-        };
-
-        postResult = getHttpAsyncClient().execute(asyncRequestProducer, new 
BasicAsyncResponseConsumer(), null);
-
-        try {
-            // Need to wait the post request actually started so that we can 
write to its output stream.
-            if (!initConnectionLatch.await(connectTimeoutMillis, 
TimeUnit.MILLISECONDS)) {
-                throw new IOException("Awaiting initConnectionLatch has been 
timeout.");
-            }
-
-            // Started.
-            transferDataLatch = new CountDownLatch(1);
-            startExtendingTransaction(transactionUrl);
-
-        } catch (final InterruptedException e) {
-            throw new IOException("Awaiting initConnectionLatch has been 
interrupted.", e);
-        }
+        final HttpOutput httpOutput = (HttpOutput) session.getOutput();
+        httpOutput.setOutputStream(outputStream);
 
+        requestBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(
+                () -> inputStream
+        ));
+        final HttpRequest request = requestBuilder.build();
+        transactionFuture = httpClient.sendAsync(request, 
HttpResponse.BodyHandlers.ofString());
+        startExtendingTransaction(transactionUrl);
     }
 
     public void finishTransferFlowFiles(final CommunicationsSession 
commSession) throws IOException {
-
-        if (postResult == null) {
-            new IllegalStateException("Data transfer has not started yet.");
+        if (transactionFuture == null) {
+            throw new IllegalStateException("Data Transfer not started");
         }
 
-        // No more data can be sent.
-        // Close PipedOutputStream so that dataPacketChannel doesn't get 
blocked.
-        // If we don't close this output stream, then PipedInputStream loops 
infinitely at read().
+        // Close PipedOutputStream from openConnectionForSend() to avoid 
blocking on PipedInputStream.read()
         commSession.getOutput().getOutputStream().close();
-        logger.debug("{} FinishTransferFlowFiles no more data can be sent", 
this);
-
-        try {
-            if (!transferDataLatch.await(requestExpirationMillis, 
TimeUnit.MILLISECONDS)) {
-                throw new IOException("Awaiting transferDataLatch has been 
timeout.");
-            }
-        } catch (final InterruptedException e) {
-            throw new IOException("Awaiting transferDataLatch has been 
interrupted.", e);
-        }
 
         stopExtendingTransaction();
 
-        final HttpResponse response;
+        final HttpResponse<String> response;
         try {
-            response = postResult.get(readTimeoutMillis, 
TimeUnit.MILLISECONDS);
+            response = transactionFuture.get(readTimeoutMillis, 
TimeUnit.MILLISECONDS);
         } catch (final ExecutionException e) {
-            logger.debug("Something has happened at sending data thread. {}", 
e.getMessage());
             throw toIOException(e);
         } catch (TimeoutException | InterruptedException e) {
             throw new IOException(e);
         }
 
-        final int responseCode = response.getStatusLine().getStatusCode();
-        if (responseCode == RESPONSE_CODE_ACCEPTED) {
-            final String receivedChecksum = 
EntityUtils.toString(response.getEntity());
+        final int responseCode = response.statusCode();
+        if (responseCode == HTTP_ACCEPTED) {
+            final String receivedChecksum = response.body();
             ((HttpInput) commSession.getInput()).setInputStream(new 
ByteArrayInputStream(receivedChecksum.getBytes()));
             ((HttpCommunicationsSession) 
commSession).setChecksum(receivedChecksum);
             logger.debug("receivedChecksum={}", receivedChecksum);
         } else {
-            try (InputStream content = response.getEntity().getContent()) {
+            try (InputStream content = new 
ByteArrayInputStream(response.body().getBytes(StandardCharsets.UTF_8))) {
                 throw handleErrResponse(responseCode, content);
             }
         }
     }
 
     private void startExtendingTransaction(final String transactionUrl) {
-        if (ttlExtendingFuture != null) {
-            return;
+        if (ttlExtendingFuture == null) {
+            final int extendFrequency = serverTransactionTtl / 2;
+            logger.debug("Extend Transaction Started [{}] Frequency [{} 
seconds]", transactionUrl, extendFrequency);
+            final Runnable command = new ExtendTransactionCommand(this, 
transactionUrl, eventReporter);
+            ttlExtendingFuture = 
ttlExtendTaskExecutor.scheduleWithFixedDelay(command, extendFrequency, 
extendFrequency, TimeUnit.SECONDS);
         }
-
-        final int extendFrequency = serverTransactionTtl / 2;
-        logger.debug("Extend Transaction Started [{}] Frequency [{} seconds]", 
transactionUrl, extendFrequency);
-        final Runnable command = new ExtendTransactionCommand(this, 
transactionUrl, eventReporter);
-        ttlExtendingFuture = 
ttlExtendTaskExecutor.scheduleWithFixedDelay(command, extendFrequency, 
extendFrequency, TimeUnit.SECONDS);
     }
 
     private void closeSilently(final Closeable closeable) {
@@ -975,35 +465,15 @@ public class SiteToSiteRestApiClient implements Closeable 
{
                 closeable.close();
             }
         } catch (final IOException e) {
-            logger.warn("Got an exception when closing {}: {}", closeable, 
e.getMessage());
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
+            logger.debug("Failed close [{}]", closeable, e);
         }
     }
 
     public TransactionResultEntity extendTransaction(final String 
transactionUrl) throws IOException {
         logger.debug("Sending extendTransaction request to transactionUrl: 
{}", transactionUrl);
-
-        final HttpPut put = createPut(transactionUrl);
-
-        put.setHeader("Accept", "application/json");
-        put.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-
-        setHandshakeProperties(put);
-
-        try (final CloseableHttpResponse response = 
getHttpClient().execute(put)) {
-            final int responseCode = response.getStatusLine().getStatusCode();
-            logger.debug("extendTransaction responseCode={}", responseCode);
-
-            try (final InputStream content = 
response.getEntity().getContent()) {
-                if (responseCode == RESPONSE_CODE_OK) {
-                    return readResponse(content);
-                }
-                throw handleErrResponse(responseCode, content);
-            }
-        }
-
+        final URI uri = getUri(transactionUrl);
+        final HttpRequest.Builder requestBuilder = 
HttpRequest.newBuilder(uri).PUT(HttpRequest.BodyPublishers.noBody());
+        return send(requestBuilder, TransactionResultEntity.class);
     }
 
     private void stopExtendingTransaction() {
@@ -1018,10 +488,6 @@ public class SiteToSiteRestApiClient implements Closeable 
{
     }
 
     private IOException handleErrResponse(final int responseCode, final 
InputStream in) throws IOException {
-        if (in == null) {
-            return new IOException("Unexpected response code: " + 
responseCode);
-        }
-
         final TransactionResultEntity errEntity = readResponse(in);
         final ResponseCode errCode = 
ResponseCode.fromCode(errEntity.getResponseCode());
 
@@ -1029,7 +495,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             case UNKNOWN_PORT -> new 
UnknownPortException(errEntity.getMessage());
             case PORT_NOT_IN_VALID_STATE -> new 
PortNotRunningException(errEntity.getMessage());
             default -> {
-                if (responseCode == RESPONSE_CODE_FORBIDDEN) {
+                if (responseCode == HTTP_FORBIDDEN) {
                     yield new HandshakeException(errEntity.getMessage());
                 }
                 yield new IOException("Unexpected response code: " + 
responseCode + " errCode:" + errCode + " errMessage:" + errEntity.getMessage());
@@ -1044,16 +510,9 @@ public class SiteToSiteRestApiClient implements Closeable 
{
         String responseMessage = null;
 
         try {
-            responseMessage = new String(bos.toByteArray(), 
StandardCharsets.UTF_8);
-            logger.debug("readResponse responseMessage={}", responseMessage);
-
-            final ObjectMapper mapper = new ObjectMapper();
-            return mapper.readValue(responseMessage, 
TransactionResultEntity.class);
-        } catch (JsonParseException | JsonMappingException e) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Failed to parse JSON.", e);
-            }
-
+            responseMessage = bos.toString(StandardCharsets.UTF_8);
+            return objectMapper.readValue(responseMessage, 
TransactionResultEntity.class);
+        } catch (final JsonParseException | JsonMappingException e) {
             final TransactionResultEntity entity = new 
TransactionResultEntity();
             entity.setResponseCode(ResponseCode.ABORT.getCode());
             entity.setMessage(responseMessage);
@@ -1061,41 +520,41 @@ public class SiteToSiteRestApiClient implements 
Closeable {
         }
     }
 
-    private String readTransactionUrl(final HttpResponse response) {
-        final Header locationUriIntentHeader = 
response.getFirstHeader(LOCATION_URI_INTENT_NAME);
+    private String readTransactionUrl(final HttpResponse<InputStream> 
response) {
+        final Optional<String> locationUriIntentHeader = 
response.headers().firstValue(LOCATION_URI_INTENT_NAME);
         logger.debug("locationUriIntentHeader={}", locationUriIntentHeader);
 
-        if (locationUriIntentHeader != null && 
LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.getValue())) {
-            final Header transactionUrl = 
response.getFirstHeader(LOCATION_HEADER_NAME);
+        if (locationUriIntentHeader.isPresent() && 
LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.get())) {
+            final Optional<String> transactionUrl = 
response.headers().firstValue(LOCATION_HEADER_NAME);
             logger.debug("transactionUrl={}", transactionUrl);
 
-            if (transactionUrl != null) {
-                return transactionUrl.getValue();
+            if (transactionUrl.isPresent()) {
+                return transactionUrl.get();
             }
         }
 
         return null;
     }
 
-    private void setHandshakeProperties(final HttpRequestBase httpRequest) {
+    private void setRequestHeaders(final HttpRequest.Builder requestBuilder) {
         if (compress) {
-            httpRequest.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, "true");
+            requestBuilder.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, 
Boolean.TRUE.toString());
         }
 
         if (requestExpirationMillis > 0) {
-            httpRequest.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION, 
String.valueOf(requestExpirationMillis));
+            requestBuilder.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION, 
String.valueOf(requestExpirationMillis));
         }
 
         if (batchCount > 0) {
-            httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT, 
String.valueOf(batchCount));
+            requestBuilder.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT, 
String.valueOf(batchCount));
         }
 
         if (batchSize > 0) {
-            httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE, 
String.valueOf(batchSize));
+            requestBuilder.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE, 
String.valueOf(batchSize));
         }
 
         if (batchDurationMillis > 0) {
-            httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION, 
String.valueOf(batchDurationMillis));
+            requestBuilder.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION, 
String.valueOf(batchDurationMillis));
         }
     }
 
@@ -1116,114 +575,18 @@ public class SiteToSiteRestApiClient implements 
Closeable {
         return url;
     }
 
-
-    private HttpGet createGet(final String path) {
-        final URI url = getUri(path);
-        final HttpGet get = new HttpGet(url);
-        get.setConfig(getRequestConfig());
-        return get;
-    }
-
-    private HttpPost createPost(final String path) {
-        final URI url = getUri(path);
-        final HttpPost post = new HttpPost(url);
-        post.setConfig(getRequestConfig());
-        return post;
-    }
-
-    private HttpPut createPut(final String path) {
-        final URI url = getUri(path);
-        final HttpPut put = new HttpPut(url);
-        put.setConfig(getRequestConfig());
-        return put;
-    }
-
-    private HttpDelete createDelete(final String path) {
-        final URI url = getUri(path);
-        final HttpDelete delete = new HttpDelete(url);
-        delete.setConfig(getRequestConfig());
-        return delete;
-    }
-
-    private String execute(final HttpGet get) throws IOException {
-        final CloseableHttpClient httpClient = getHttpClient();
-
-        if (logger.isTraceEnabled()) {
-            Arrays.stream(get.getAllHeaders()).forEach(h -> logger.debug("REQ| 
{}", h));
-        }
-
-        try (final CloseableHttpResponse response = httpClient.execute(get)) {
-            if (logger.isTraceEnabled()) {
-                Arrays.stream(response.getAllHeaders()).forEach(h -> 
logger.debug("RES| {}", h));
-            }
-
-            final StatusLine statusLine = response.getStatusLine();
-            final int statusCode = statusLine.getStatusCode();
-            if (RESPONSE_CODE_OK != statusCode) {
-                throw new HttpGetFailedException(statusCode, 
statusLine.getReasonPhrase(), null);
-            }
-            final HttpEntity entity = response.getEntity();
-            final String responseMessage = EntityUtils.toString(entity);
-            return responseMessage;
-        }
-    }
-
-    public class HttpGetFailedException extends IOException {
-        private static final long serialVersionUID = 7920714957269466946L;
-
-        private final int responseCode;
-        private final String responseMessage;
-        private final String explanation;
-
-        public HttpGetFailedException(final int responseCode, final String 
responseMessage, final String explanation) {
-            super("response code " + responseCode + ":" + responseMessage + " 
with explanation: " + explanation);
-            this.responseCode = responseCode;
-            this.responseMessage = responseMessage;
-            this.explanation = explanation;
-        }
-
-        public int getResponseCode() {
-            return responseCode;
-        }
-
-        public String getDescription() {
-            return StringUtils.isNotEmpty(explanation) ? explanation : 
responseMessage;
-        }
-    }
-
-
-    private <T> T execute(final HttpGet get, final Class<T> entityClass) 
throws IOException {
-        get.setHeader("Accept", "application/json");
-        final String responseMessage = execute(get);
-
-        final ObjectMapper mapper = new ObjectMapper();
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-
-        try {
-            return mapper.readValue(responseMessage, entityClass);
-        } catch (JsonParseException e) {
-            final String msg = "Failed to parse Json. The specified URL " + 
baseUrl + " is not a proper remote NiFi endpoint for Site-to-Site 
communication.";
-            logger.warn("{} requestedUrl={}, response={}", msg, get.getURI(), 
responseMessage);
-            throw new IOException(msg, e);
-        }
-    }
-
     public String getBaseUrl() {
         return baseUrl;
     }
 
-    /**
-     * Set the baseUrl as it is, without altering or adjusting the specified 
url string.
-     * If the url is specified by user input, and if it needs to be resolved 
with leniency,
-     * then use {@link #resolveBaseUrl(String)} method before passing it to 
this method.
-     * @param baseUrl url to set
-     */
     public void setBaseUrl(final String baseUrl) {
         this.baseUrl = baseUrl;
     }
 
     public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
-        this.connectTimeoutMillis = connectTimeoutMillis;
+        
httpClientBuilder.connectTimeout(Duration.ofMillis(connectTimeoutMillis));
+        httpClient.close();
+        httpClient = httpClientBuilder.build();
     }
 
     public void setReadTimeoutMillis(final int readTimeoutMillis) {
@@ -1234,144 +597,12 @@ public class SiteToSiteRestApiClient implements 
Closeable {
         this.cacheExpirationMillis = expirationMillis;
     }
 
-    public static String getFirstUrl(final String clusterUrlStr) {
-        if (clusterUrlStr == null) {
-            return null;
-        }
-
-        final int commaIndex = clusterUrlStr.indexOf(',');
-        if (commaIndex > -1) {
-            return clusterUrlStr.substring(0, commaIndex);
-        }
-        return clusterUrlStr;
-    }
-
-    /**
-     * Parse the comma-separated URLs string for the remote NiFi instances.
-     * @return A set containing one or more URLs
-     * @throws IllegalArgumentException when it fails to parse the URLs string,
-     * URLs string contains multiple protocols (http and https mix),
-     * or none of URL is specified.
-     */
-    public static Set<String> parseClusterUrls(final String clusterUrlStr) {
-        final Set<String> urls = new LinkedHashSet<>();
-        if (clusterUrlStr != null && !clusterUrlStr.isEmpty()) {
-            Arrays.stream(clusterUrlStr.split(","))
-                    .map(s -> s.trim())
-                    .filter(s -> !s.isEmpty())
-                    .forEach(s -> {
-                        validateUriString(s);
-                        urls.add(resolveBaseUrl(s).intern());
-                    });
-        }
-
-        if (urls.isEmpty()) {
-            throw new IllegalArgumentException("Cluster URL was not 
specified.");
-        }
-
-        final Predicate<String> isHttps = url -> 
url.toLowerCase().startsWith("https:");
-        if (urls.stream().anyMatch(isHttps) && 
urls.stream().anyMatch(isHttps.negate())) {
-            throw new IllegalArgumentException("Different protocols are used 
in the cluster URLs " + clusterUrlStr);
-        }
-
-        return Collections.unmodifiableSet(urls);
-    }
-
-    private static void validateUriString(String s) {
-        // parse the uri
-        final URI uri;
-        try {
-            uri = URI.create(s);
-        } catch (final IllegalArgumentException e) {
-            throw new IllegalArgumentException("The specified remote process 
group URL is malformed: " + s);
-        }
-
-        // validate each part of the uri
-        if (uri.getScheme() == null || uri.getHost() == null) {
-            throw new IllegalArgumentException("The specified remote process 
group URL is malformed: " + s);
-        }
-
-        if (!(uri.getScheme().equalsIgnoreCase("http") || 
uri.getScheme().equalsIgnoreCase("https"))) {
-            throw new IllegalArgumentException("The specified remote process 
group URL is invalid because it is not http or https: " + s);
-        }
-    }
-
-    private static String resolveBaseUrl(final String clusterUrl) {
-        Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null.");
-        final URI uri;
-        try {
-            uri = new URI(clusterUrl.trim());
-        } catch (final URISyntaxException e) {
-            throw new IllegalArgumentException("The specified URL is 
malformed: " + clusterUrl);
-        }
-
-        return resolveBaseUrl(uri);
-    }
-
-    /**
-     * Resolve NiFi API url with leniency. This method does following 
conversion on uri path:
-     * <ul>
-     * <li>/ to /nifi-api</li>
-     * <li>/nifi to /nifi-api</li>
-     * <li>/some/path/ to /some/path/nifi-api</li>
-     * </ul>
-     * @param clusterUrl url to be resolved
-     * @return resolved url
-     */
-    private static String resolveBaseUrl(final URI clusterUrl) {
-
-        if (clusterUrl.getScheme() == null || clusterUrl.getHost() == null) {
-            throw new IllegalArgumentException("The specified URL is 
malformed: " + clusterUrl);
-        }
-
-        if (!(clusterUrl.getScheme().equalsIgnoreCase("http") || 
clusterUrl.getScheme().equalsIgnoreCase("https"))) {
-            throw new IllegalArgumentException("The specified URL is invalid 
because it is not http or https: " + clusterUrl);
-        }
-
-
-        String uriPath = clusterUrl.getPath().trim();
-
-        if (StringUtils.isEmpty(uriPath) || uriPath.equals("/")) {
-            uriPath = "/nifi";
-        } else if (uriPath.endsWith("/")) {
-            uriPath = uriPath.substring(0, uriPath.length() - 1);
-        }
-
-        final StringBuilder uriPathBuilder = new StringBuilder(uriPath);
-        if (uriPath.endsWith("/nifi")) {
-            uriPathBuilder.append("-api");
-        } else if (!uriPath.endsWith("/nifi-api")) {
-            uriPathBuilder.append("/nifi-api");
-        }
-
-        try {
-            return new URIBuilder()
-                    .setScheme(clusterUrl.getScheme())
-                    .setHost(clusterUrl.getHost())
-                    .setPort(clusterUrl.getPort())
-                    .setPath(uriPathBuilder.toString())
-                    .build()
-                    .toString();
-        } catch (URISyntaxException e) {
-            throw new IllegalArgumentException(e);
-        }
-    }
-
     public void setBaseUrl(final String scheme, final String host, final int 
port) {
-        setBaseUrl(scheme, host, port, "/nifi-api");
-    }
-
-    private void setBaseUrl(final String scheme, final String host, final int 
port, final String path) {
         final String baseUri;
         try {
-            baseUri = new URIBuilder()
-                    .setScheme(scheme)
-                    .setHost(host)
-                    .setPort(port)
-                    .setPath(path)
-                    .build()
-                    .toString();
-        } catch (URISyntaxException e) {
+            final URI uri = new URI(scheme, null, host, port, "/nifi-api", 
null, null);
+            baseUri = uri.toString();
+        } catch (final URISyntaxException e) {
             throw new IllegalArgumentException(e);
         }
         this.setBaseUrl(baseUri);
@@ -1382,7 +613,9 @@ public class SiteToSiteRestApiClient implements Closeable {
     }
 
     public void setLocalAddress(final InetAddress localAddress) {
-        this.localAddress = localAddress;
+        httpClientBuilder.localAddress(localAddress);
+        httpClient.close();
+        httpClient = httpClientBuilder.build();
     }
 
     public void setRequestExpirationMillis(final long requestExpirationMillis) 
{
@@ -1432,50 +665,80 @@ public class SiteToSiteRestApiClient implements 
Closeable {
             urlBuilder.append("&checksum=").append(checksum);
         }
 
-        final HttpDelete delete = createDelete(urlBuilder.toString());
-        delete.setHeader("Accept", "application/json");
-        delete.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
-
-        setHandshakeProperties(delete);
-
-        try (CloseableHttpResponse response = getHttpClient().execute(delete)) 
{
-            final int responseCode = response.getStatusLine().getStatusCode();
-            logger.debug("commitReceivingFlowFiles responseCode={}", 
responseCode);
+        final HttpRequest.Builder requestBuilder = 
HttpRequest.newBuilder(getUri(urlBuilder.toString())).DELETE();
+        requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
 
-            try (InputStream content = response.getEntity().getContent()) {
-                return switch (responseCode) {
-                    case RESPONSE_CODE_OK -> readResponse(content);
-                    case RESPONSE_CODE_BAD_REQUEST -> readResponse(content);
-                    default -> throw handleErrResponse(responseCode, content);
-                };
-            }
+        final HttpResponse<InputStream> response = sendRequest(requestBuilder);
+        final int responseCode = response.statusCode();
+        try (InputStream content = response.body()) {
+            return switch (responseCode) {
+                case HTTP_OK, HTTP_BAD_REQUEST -> readResponse(content);
+                default -> throw handleErrResponse(responseCode, content);
+            };
         }
-
     }
 
     public TransactionResultEntity commitTransferFlowFiles(final String 
transactionUrl, final ResponseCode clientResponse) throws IOException {
         final String requestUrl = transactionUrl + "?responseCode=" + 
clientResponse.getCode();
         logger.debug("Sending commitTransferFlowFiles request to 
transactionUrl: {}", requestUrl);
 
-        final HttpDelete delete = createDelete(requestUrl);
-        delete.setHeader("Accept", "application/json");
-        delete.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+        final HttpRequest.Builder requestBuilder = 
HttpRequest.newBuilder(getUri(requestUrl)).DELETE();
+        requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
 
-        setHandshakeProperties(delete);
+        final HttpResponse<InputStream> response = sendRequest(requestBuilder);
+        final int responseCode = response.statusCode();
+        try (InputStream content = response.body()) {
+            return switch (responseCode) {
+                case HTTP_OK, HTTP_BAD_REQUEST -> readResponse(content);
+                default -> throw handleErrResponse(responseCode, content);
+            };
+        }
+    }
 
-        try (CloseableHttpResponse response = getHttpClient().execute(delete)) 
{
-            final int responseCode = response.getStatusLine().getStatusCode();
-            logger.debug("commitTransferFlowFiles responseCode={}", 
responseCode);
+    private <T> T send(final HttpRequest.Builder requestBuilder, final 
Class<T> responseClass) throws IOException {
+        requestBuilder.setHeader(ACCEPT_HEADER, APPLICATION_JSON);
+        final HttpResponse<InputStream> response = sendRequest(requestBuilder);
+        final int statusCode = response.statusCode();
 
-            try (InputStream content = response.getEntity().getContent()) {
-                return switch (responseCode) {
-                    case RESPONSE_CODE_OK -> readResponse(content);
-                    case RESPONSE_CODE_BAD_REQUEST -> readResponse(content);
-                    default -> throw handleErrResponse(responseCode, content);
-                };
+        try (InputStream inputStream = response.body()) {
+            if (HTTP_OK == statusCode) {
+                return objectMapper.readValue(inputStream, responseClass);
+            } else {
+                throw new IOException("Request URI [%s] HTTP 
%d".formatted(response.uri(), statusCode));
             }
         }
+    }
+
+    private HttpResponse<InputStream> sendRequest(final HttpRequest.Builder 
requestBuilder) throws IOException {
+        setRequestHeaders(requestBuilder);
+
+        requestBuilder.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+        requestBuilder.timeout(Duration.ofMillis(readTimeoutMillis));
+
+        final HttpRequest request = requestBuilder.build();
+
+        try {
+            final HttpResponse<InputStream> response = 
httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
+            final Optional<SSLSession> sslSessionFound = response.sslSession();
+            sslSessionFound.ifPresent(this::setTrustedPeerDn);
+            return response;
+        } catch (final InterruptedException e) {
+            throw new IOException("Request URI [%s] 
interrupted".formatted(request.uri()), e);
+        }
+    }
 
+    private void setTrustedPeerDn(final SSLSession sslSession) {
+        try {
+            final Certificate[] peerCertificates = 
sslSession.getPeerCertificates();
+            if (peerCertificates.length == 0) {
+                logger.info("Peer Certificates not found");
+            } else {
+                final X509Certificate peerCertificate = (X509Certificate) 
peerCertificates[0];
+                trustedPeerDn = 
StandardPrincipalFormatter.getInstance().getSubject(peerCertificate);
+            }
+        } catch (final SSLPeerUnverifiedException e) {
+            logger.warn("Peer Certificate verification failed", e);
+        }
     }
 
     private static class RemoteGroupContents {
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/SiteToSiteRestApiClientTest.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/SiteToSiteRestApiClientTest.java
new file mode 100644
index 0000000000..2fbdbdd049
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/SiteToSiteRestApiClientTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import mockwebserver3.RecordedRequest;
+import okhttp3.Headers;
+import okhttp3.HttpUrl;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.SiteToSiteEventReporter;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.protocol.http.HttpHeaders;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.remote.PeerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.api.entity.PeersEntity;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static java.net.HttpURLConnection.HTTP_ACCEPTED;
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_PROXY_AUTH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+@Timeout(5)
+class SiteToSiteRestApiClientTest {
+
+    private static final int TIMEOUT = 5;
+
+    private static final String APPLICATION_JSON = "application/json";
+    private static final String APPLICATION_OCTET_STREAM = 
"application/octet-stream";
+    private static final String TEXT_PLAIN = "text/plain";
+
+    private static final String ACCEPT_HEADER = "Accept";
+    private static final String CONTENT_TYPE_HEADER = "Content-Type";
+    private static final String PROXY_AUTHENTICATE_HEADER = 
"Proxy-Authenticate";
+    private static final String PROXY_AUTHORIZATION_HEADER = 
"Proxy-Authorization";
+    private static final String BASIC_REALM = "Basic realm=\"NiFi\"";
+
+    private static final String NIFI_API_PATH = "/nifi-api";
+    private static final String SITE_TO_SITE_PATH = "/site-to-site";
+    private static final String SITE_TO_SITE_PEERS_PATH = 
"/site-to-site/peers";
+    private static final String RECEIVE_TRANSACTION_PATH = 
"/data-transfer/output-ports/port-identifier/transactions";
+    private static final String RECEIVE_TRANSACTION_FORMAT = 
"%s/data-transfer/output-ports/port-identifier/transactions";
+    private static final String SEND_TRANSACTION_PATH = 
"/data-transfer/input-ports/port-identifier/transactions";
+    private static final String SEND_TRANSACTION_FORMAT = 
"%s/data-transfer/input-ports/port-identifier/transactions";
+    private static final String FLOW_FILES_PATH_FORMAT = "%s/flow-files";
+
+    private static final String DELETE_METHOD = "DELETE";
+    private static final String GET_METHOD = "GET";
+    private static final String POST_METHOD = "POST";
+    private static final String PUT_METHOD = "PUT";
+
+    private static final String CHECKSUM_PARAMETER = "checksum";
+    private static final String RESPONSE_CODE_PARAMETER = "responseCode";
+
+    private static final int LATEST_PROTOCOL_VERSION = 5;
+    private static final String FIRST_PROTOCOL_VERSION = "1";
+    private static final String CHECKSUM = "CHECKSUM";
+    private static final String PORT_ID = "port-identifier";
+    private static final int TRANSACTION_TTL = 5000;
+    private static final int READ_TIMEOUT = 5000;
+
+    private static final String PROXY_USER = "user";
+    private static final String PROXY_PASS = "granted";
+    private static final String PROXY_CREDENTIALS = "Basic dXNlcjpncmFudGVk";
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final MockWebServer server = new MockWebServer();
+
+    private SiteToSiteRestApiClient apiClient;
+
+    @BeforeEach
+    void setApiClient() throws IOException {
+        server.start();
+        apiClient = new SiteToSiteRestApiClient(null, null, 
SiteToSiteEventReporter.DISABLED);
+        apiClient.setReadTimeoutMillis(READ_TIMEOUT);
+        apiClient.setRequestExpirationMillis(READ_TIMEOUT);
+        apiClient.setConnectTimeoutMillis(READ_TIMEOUT);
+        apiClient.setCacheExpirationMillis(READ_TIMEOUT);
+
+        final InetAddress localAddress = InetAddress.getByName("127.0.0.1");
+        apiClient.setLocalAddress(localAddress);
+    }
+
+    @AfterEach
+    void stopServer() throws IOException {
+        server.close();
+        apiClient.close();
+    }
+
+    @Test
+    void testSetBaseUrl() {
+        final HttpUrl url = server.url(NIFI_API_PATH);
+
+        apiClient.setBaseUrl(url.scheme(), url.host(), url.port());
+
+        final String baseUrl = apiClient.getBaseUrl();
+        assertEquals(url.toString(), baseUrl);
+    }
+
+    @Test
+    void testGetController() throws IOException, InterruptedException {
+        enqueueController();
+
+        final String serverUrl = getServerUrl();
+        final ControllerDTO controller = apiClient.getController(serverUrl);
+
+        assertNotNull(controller);
+        assertEquals(ControllerDTO.class.getName(), controller.getId());
+
+        assertRequestRecorded(SITE_TO_SITE_PATH);
+        assertEquals(LATEST_PROTOCOL_VERSION, 
apiClient.getTransactionProtocolVersion());
+    }
+
+    @Test
+    void testGetControllerProxy() throws IOException, InterruptedException {
+        final Proxy proxyAddress = server.getProxyAddress();
+        final InetSocketAddress proxyServerAddress = (InetSocketAddress) 
proxyAddress.address();
+        final HttpProxy httpProxy = new 
HttpProxy(proxyServerAddress.getHostName(), proxyServerAddress.getPort(), 
PROXY_USER, PROXY_PASS);
+        apiClient = new SiteToSiteRestApiClient(null, httpProxy, 
SiteToSiteEventReporter.DISABLED);
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+        apiClient.setReadTimeoutMillis(READ_TIMEOUT);
+
+        server.enqueue(new MockResponse.Builder()
+                .code(HTTP_PROXY_AUTH)
+                .setHeader(PROXY_AUTHENTICATE_HEADER, BASIC_REALM)
+                .build()
+        );
+
+        enqueueController();
+
+        final ControllerDTO controller = apiClient.getController(serverUrl);
+        assertEquals(ControllerDTO.class.getName(), controller.getId());
+
+        assertRequestRecorded(SITE_TO_SITE_PATH);
+
+        final RecordedRequest authenticatedRequest = 
assertRequestRecorded(SITE_TO_SITE_PATH);
+        final Headers authenticatedHeaders = authenticatedRequest.getHeaders();
+        final String proxyAuthorization = 
authenticatedHeaders.get(PROXY_AUTHORIZATION_HEADER);
+        assertEquals(PROXY_CREDENTIALS, proxyAuthorization);
+    }
+
+    @Test
+    void testGetPeers() throws IOException, InterruptedException {
+        final PeersEntity entity = new PeersEntity();
+        entity.setPeers(List.of());
+
+        enqueueResponseBody(entity);
+
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+
+        final Collection<PeerDTO> peers = apiClient.getPeers();
+        assertNotNull(peers);
+
+        final RecordedRequest request = 
assertRequestRecorded(SITE_TO_SITE_PEERS_PATH);
+        assertProtocolVersionFound(request);
+    }
+
+    @Test
+    void testCommitTransferFlowFiles() throws IOException, 
InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+
+        final TransactionResultEntity entity = new TransactionResultEntity();
+        enqueueResponseBody(entity);
+
+        final ResponseCode responseCode = ResponseCode.PROPERTIES_OK;
+        final TransactionResultEntity resultEntity = 
apiClient.commitTransferFlowFiles(RECEIVE_TRANSACTION_PATH, responseCode);
+
+        assertNotNull(resultEntity);
+        final RecordedRequest request = 
assertRequestRecorded(RECEIVE_TRANSACTION_PATH);
+        assertEquals(DELETE_METHOD, request.getMethod());
+
+        assertResponseCodeEquals(request, responseCode);
+        assertProtocolVersionFound(request);
+    }
+
+    @Test
+    void testCommitReceivingFlowFiles() throws IOException, 
InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+
+        final TransactionResultEntity entity = new TransactionResultEntity();
+        enqueueResponseBody(entity);
+
+        final ResponseCode responseCode = ResponseCode.CONFIRM_TRANSACTION;
+        final TransactionResultEntity resultEntity = 
apiClient.commitReceivingFlowFiles(RECEIVE_TRANSACTION_PATH, responseCode, 
CHECKSUM);
+
+        assertNotNull(resultEntity);
+        final RecordedRequest request = 
assertRequestRecorded(RECEIVE_TRANSACTION_PATH);
+        assertEquals(DELETE_METHOD, request.getMethod());
+
+        final String checksumParameter = 
request.getUrl().queryParameter(CHECKSUM_PARAMETER);
+        assertEquals(CHECKSUM, checksumParameter);
+
+        assertResponseCodeEquals(request, responseCode);
+        assertProtocolVersionFound(request);
+    }
+
+    @Test
+    void testInitiateTransactionCreated() throws IOException, 
InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+
+        initiateTransaction(serverUrl, TransferDirection.RECEIVE);
+    }
+
+    @Test
+    void testInitiateTransactionSendCreated() throws IOException, 
InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+
+        initiateTransaction(serverUrl, TransferDirection.SEND);
+    }
+
+    @Test
+    void testInitiateTransactionSendProxyAuthentication() throws IOException, 
InterruptedException {
+        final Proxy proxyAddress = server.getProxyAddress();
+        final InetSocketAddress proxyServerAddress = (InetSocketAddress) 
proxyAddress.address();
+        final HttpProxy httpProxy = new 
HttpProxy(proxyServerAddress.getHostName(), proxyServerAddress.getPort(), 
PROXY_USER, PROXY_PASS);
+        apiClient = new SiteToSiteRestApiClient(null, httpProxy, 
SiteToSiteEventReporter.DISABLED);
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+        apiClient.setReadTimeoutMillis(READ_TIMEOUT);
+
+        enqueueController();
+        enqueueTransaction(serverUrl);
+
+        final String transactionUri = 
apiClient.initiateTransaction(TransferDirection.SEND, PORT_ID);
+        assertEquals(serverUrl, transactionUri);
+
+        final RecordedRequest controllerRequest = 
assertRequestRecorded(SITE_TO_SITE_PATH);
+        assertEquals(GET_METHOD, controllerRequest.getMethod());
+
+        final RecordedRequest request = 
assertRequestRecorded(SEND_TRANSACTION_PATH);
+        assertEquals(POST_METHOD, request.getMethod());
+        assertProtocolVersionFound(request);
+    }
+
+    @Test
+    void testExtendTransaction() throws IOException, InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+
+        final TransactionResultEntity entity = new TransactionResultEntity();
+        enqueueResponseBody(entity);
+
+        final TransactionResultEntity resultEntity = 
apiClient.extendTransaction(RECEIVE_TRANSACTION_PATH);
+
+        assertNotNull(resultEntity);
+        final RecordedRequest request = 
assertRequestRecorded(RECEIVE_TRANSACTION_PATH);
+        assertEquals(PUT_METHOD, request.getMethod());
+        assertProtocolVersionFound(request);
+    }
+
+    @Test
+    void testExtendTransactionServerError() throws InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+
+        server.enqueue(new 
MockResponse.Builder().code(HTTP_INTERNAL_ERROR).build());
+
+        assertThrows(IOException.class, () -> 
apiClient.extendTransaction(RECEIVE_TRANSACTION_PATH));
+
+        final RecordedRequest request = 
assertRequestRecorded(RECEIVE_TRANSACTION_PATH);
+        assertEquals(PUT_METHOD, request.getMethod());
+        assertProtocolVersionFound(request);
+    }
+
+    @Test
+    void testOpenConnectionForReceive() throws IOException, 
InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+        final PeerDescription peerDescription = new 
PeerDescription(server.getHostName(), server.getPort(), false);
+        final HttpCommunicationsSession session = new 
HttpCommunicationsSession();
+        final Peer peer = new Peer(peerDescription, session, serverUrl, 
serverUrl);
+
+        server.enqueue(new MockResponse.Builder().code(HTTP_OK).build());
+
+        final boolean open = 
apiClient.openConnectionForReceive(RECEIVE_TRANSACTION_PATH, peer);
+        assertFalse(open);
+
+        final RecordedRequest request = server.takeRequest(TIMEOUT, 
TimeUnit.SECONDS);
+        assertNotNull(request);
+        assertEquals(GET_METHOD, request.getMethod());
+        assertProtocolVersionFound(request);
+    }
+
+    @Test
+    void testOpenConnectionForReceiveAccepted() throws IOException, 
InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+        final String receiveTransactionUri = 
RECEIVE_TRANSACTION_FORMAT.formatted(serverUrl);
+
+        final String transactionUri = 
initiateTransaction(receiveTransactionUri, TransferDirection.RECEIVE);
+
+        final PeerDescription peerDescription = new 
PeerDescription(server.getHostName(), server.getPort(), false);
+        final HttpCommunicationsSession session = new 
HttpCommunicationsSession();
+        final Peer peer = new Peer(peerDescription, session, serverUrl, 
serverUrl);
+
+        server.enqueue(new MockResponse.Builder().code(HTTP_ACCEPTED).build());
+
+        final boolean open = 
apiClient.openConnectionForReceive(transactionUri, peer);
+        assertTrue(open);
+
+        final CommunicationsInput input = session.getInput();
+        try (InputStream inputStream = input.getInputStream()) {
+            final int read = inputStream.read();
+            assertEquals(-1, read);
+        }
+
+        final String transitUri = session.createTransitUri(null, null);
+        final String expectedTransitUri = 
FLOW_FILES_PATH_FORMAT.formatted(receiveTransactionUri);
+        assertEquals(expectedTransitUri, transitUri);
+
+        final RecordedRequest request = server.takeRequest(TIMEOUT, 
TimeUnit.SECONDS);
+        assertNotNull(request);
+        assertEquals(GET_METHOD, request.getMethod());
+        assertProtocolVersionFound(request);
+    }
+
+    @Test
+    void testOpenConnectionForSend() throws IOException, InterruptedException {
+        final String serverUrl = getServerUrl();
+        apiClient.setBaseUrl(serverUrl);
+
+        final String sendTransactionUri = 
SEND_TRANSACTION_FORMAT.formatted(serverUrl);
+        final String transactionUri = initiateTransaction(sendTransactionUri, 
TransferDirection.SEND);
+
+        final PeerDescription peerDescription = new 
PeerDescription(server.getHostName(), server.getPort(), false);
+        final HttpCommunicationsSession session = new 
HttpCommunicationsSession();
+        final Peer peer = new Peer(peerDescription, session, serverUrl, 
serverUrl);
+
+        server.enqueue(new 
MockResponse.Builder().code(HTTP_ACCEPTED).body(CHECKSUM).build());
+
+        apiClient.openConnectionForSend(transactionUri, peer);
+
+        // Finish Transfer to complete Connection
+        apiClient.finishTransferFlowFiles(session);
+
+        assertEquals(CHECKSUM, session.getChecksum());
+        final byte[] inputBytes = 
session.getInput().getInputStream().readAllBytes();
+        assertEquals(CHECKSUM, new String(inputBytes, StandardCharsets.UTF_8));
+
+        final RecordedRequest request = server.takeRequest(TIMEOUT, 
TimeUnit.SECONDS);
+        assertNotNull(request);
+        assertEquals(POST_METHOD, request.getMethod());
+        assertProtocolVersionFound(request);
+
+        final Headers headers = request.getHeaders();
+        final String contentType = headers.get(CONTENT_TYPE_HEADER);
+        assertEquals(APPLICATION_OCTET_STREAM, contentType);
+        final String accept = headers.get(ACCEPT_HEADER);
+        assertEquals(TEXT_PLAIN, accept);
+    }
+
+    @Test
+    void testFinishTransferFlowFilesNotStarted() throws IOException {
+        try (CommunicationsSession session = 
mock(CommunicationsSession.class)) {
+            assertThrows(IllegalStateException.class, () -> 
apiClient.finishTransferFlowFiles(session));
+        }
+    }
+
+    private String initiateTransaction(final String serverUrl, final 
TransferDirection transferDirection) throws IOException, InterruptedException {
+        enqueueTransaction(serverUrl);
+
+        final String transactionUri = 
apiClient.initiateTransaction(transferDirection, PORT_ID);
+
+        assertEquals(serverUrl, transactionUri);
+
+        final String expectedPath;
+        if (TransferDirection.SEND == transferDirection) {
+            expectedPath = SEND_TRANSACTION_PATH;
+        } else {
+            expectedPath = RECEIVE_TRANSACTION_PATH;
+        }
+
+        final RecordedRequest request = assertRequestRecorded(expectedPath);
+        assertEquals(POST_METHOD, request.getMethod());
+        assertProtocolVersionFound(request);
+
+        return transactionUri;
+    }
+
+    private void enqueueTransaction(final String serverUrl) {
+        server.enqueue(new MockResponse.Builder()
+                .code(HTTP_CREATED)
+                .setHeader(HttpHeaders.LOCATION_URI_INTENT_NAME, 
HttpHeaders.LOCATION_URI_INTENT_VALUE)
+                .setHeader(HttpHeaders.LOCATION_HEADER_NAME, serverUrl)
+                .setHeader(HttpHeaders.PROTOCOL_VERSION, 
FIRST_PROTOCOL_VERSION)
+                .setHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL, 
Integer.toString(TRANSACTION_TTL))
+                .build()
+        );
+    }
+
+    private void enqueueController() throws JsonProcessingException {
+        final ControllerDTO expected = new ControllerDTO();
+        expected.setId(ControllerDTO.class.getName());
+        final ControllerEntity entity = new ControllerEntity();
+        entity.setController(expected);
+
+        enqueueResponseBody(entity);
+    }
+
+    private void enqueueResponseBody(final Object responseBody) throws 
JsonProcessingException {
+        final String body = objectMapper.writeValueAsString(responseBody);
+
+        server.enqueue(new 
MockResponse.Builder().code(HTTP_OK).body(body).build());
+    }
+
+    private String getServerUrl() {
+        final HttpUrl url = server.url(NIFI_API_PATH);
+        return url.toString();
+    }
+
+    private void assertProtocolVersionFound(final RecordedRequest request) {
+        final Headers headers = request.getHeaders();
+        final String protocolVersion = 
headers.get(HttpHeaders.PROTOCOL_VERSION);
+        assertEquals(FIRST_PROTOCOL_VERSION, protocolVersion);
+    }
+
+    private void assertResponseCodeEquals(final RecordedRequest request, final 
ResponseCode responseCode) {
+        final String responseCodeParameter = 
request.getUrl().queryParameter(RESPONSE_CODE_PARAMETER);
+        assertNotNull(responseCodeParameter);
+
+        final int codeParameter = Integer.parseInt(responseCodeParameter);
+        assertEquals(responseCode.getCode(), codeParameter);
+    }
+
+    private RecordedRequest assertRequestRecorded(final String expectedPath) 
throws InterruptedException {
+        final RecordedRequest request = server.takeRequest(TIMEOUT, 
TimeUnit.SECONDS);
+        assertNotNull(request);
+
+        final Headers headers = request.getHeaders();
+        final String accept = headers.get(ACCEPT_HEADER);
+
+        final String method = request.getMethod();
+        if (POST_METHOD.equals(method)) {
+            assertEquals(APPLICATION_JSON, accept);
+        }
+
+        final HttpUrl url = request.getUrl();
+        final String encodedPath = url.encodedPath();
+
+        final String path = NIFI_API_PATH + expectedPath;
+        assertEquals(path, encodedPath);
+
+        return request;
+    }
+}
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
index 9c566ae0e6..923908cef7 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java
@@ -21,7 +21,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Iterator;
 import java.util.Set;
 
-import static 
org.apache.nifi.remote.util.SiteToSiteRestApiClient.parseClusterUrls;
+import static org.apache.nifi.remote.util.ClusterUrlParser.parseClusterUrls;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git 
a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
 
b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
index c776245ae1..2e49467252 100644
--- 
a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
+++ 
b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
@@ -35,7 +35,7 @@ import org.apache.nifi.remote.SiteToSiteEventReporter;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
-import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.remote.util.ClusterUrlParser;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.ssl.SSLContextProvider;
 
@@ -155,7 +155,7 @@ public class SiteToSiteUtils {
             stateManager = ((ReportingContext) 
reportContext).getStateManager();
         }
         return new SiteToSiteClient.Builder()
-                .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
+                .urls(ClusterUrlParser.parseClusterUrls(destinationUrl))
                 
.portName(reportContext.getProperty(SiteToSiteUtils.PORT_NAME).getValue())
                 
.useCompression(reportContext.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
                 .eventReporter(eventReporter)
@@ -172,7 +172,7 @@ public class SiteToSiteUtils {
         public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
             final String value = 
context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
             try {
-                SiteToSiteRestApiClient.parseClusterUrls(value);
+                ClusterUrlParser.parseClusterUrls(value);
                 return new ValidationResult.Builder()
                         .input(input)
                         .subject(subject)
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 1fe34b17fb..971577bb16 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
-import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.nio.charset.StandardCharsets;
@@ -91,10 +90,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRemoteProcessGroup.class);
 
-    // status codes
-    private static final int UNAUTHORIZED_STATUS_CODE = 
HttpURLConnection.HTTP_UNAUTHORIZED;
-    private static final int FORBIDDEN_STATUS_CODE = 
HttpURLConnection.HTTP_FORBIDDEN;
-
     private final String id;
 
     private volatile String targetUris;
@@ -413,7 +408,18 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     @Override
     public String getTargetUri() {
-        return SiteToSiteRestApiClient.getFirstUrl(targetUris);
+        final String targetUri;
+        if (targetUris == null) {
+            targetUri = null;
+        } else {
+            final int commaIndex = targetUris.indexOf(',');
+            if (commaIndex > -1) {
+                targetUri = targetUris.substring(0, commaIndex);
+            } else {
+                targetUri = targetUris;
+            }
+        }
+        return targetUri;
     }
 
     @Override
@@ -1256,36 +1262,24 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             }
 
             try (final SiteToSiteRestApiClient apiClient = 
getSiteToSiteRestApiClient()) {
-                try {
-                    final ControllerDTO dto = 
apiClient.getController(targetUris);
-
-                    if (dto.getRemoteSiteListeningPort() == null && 
SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) {
-                        authorizationIssue = "Remote instance is not 
configured to allow RAW Site-to-Site communications at this time.";
-                    } else if (dto.getRemoteSiteHttpListeningPort() == null && 
SiteToSiteTransportProtocol.HTTP.equals(transportProtocol)) {
-                        authorizationIssue = "Remote instance is not 
configured to allow HTTP Site-to-Site communications at this time.";
-                    } else {
-                        authorizationIssue = null;
-                    }
+                final ControllerDTO dto = apiClient.getController(targetUris);
 
-                    writeLock.lock();
-                    try {
-                        listeningPort = dto.getRemoteSiteListeningPort();
-                        listeningHttpPort = 
dto.getRemoteSiteHttpListeningPort();
-                        destinationSecure = dto.isSiteToSiteSecure();
-                    } finally {
-                        writeLock.unlock();
-                    }
-                } catch (SiteToSiteRestApiClient.HttpGetFailedException e) {
-                    final int responseCode = e.getResponseCode();
-                    if (responseCode == UNAUTHORIZED_STATUS_CODE || 
responseCode == FORBIDDEN_STATUS_CODE) {
-                        authorizationIssue = e.getDescription();
-                    } else {
-                        final String message = e.getDescription();
-                        logger.warn("{} When communicating with remote 
instance, got unexpected result. {}", this, message);
-                        authorizationIssue = "Unable to determine Site-to-Site 
availability.";
-                    }
+                if (dto.getRemoteSiteListeningPort() == null && 
SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) {
+                    authorizationIssue = "Remote instance is not configured to 
allow RAW Site-to-Site communications at this time.";
+                } else if (dto.getRemoteSiteHttpListeningPort() == null && 
SiteToSiteTransportProtocol.HTTP.equals(transportProtocol)) {
+                    authorizationIssue = "Remote instance is not configured to 
allow HTTP Site-to-Site communications at this time.";
+                } else {
+                    authorizationIssue = null;
                 }
 
+                writeLock.lock();
+                try {
+                    listeningPort = dto.getRemoteSiteListeningPort();
+                    listeningHttpPort = dto.getRemoteSiteHttpListeningPort();
+                    destinationSecure = dto.isSiteToSiteSecure();
+                } finally {
+                    writeLock.unlock();
+                }
             } catch (final Exception e) {
                 logger.warn("Unable to connect to {} due to {}",  
StandardRemoteProcessGroup.this, e.toString());
                 getEventReporter().reportEvent(Severity.WARNING, "Site to 
Site", String.format("Unable to connect to %s due to %s",
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 0b47c280b7..1c091c9234 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -63,10 +63,6 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>c2-protocol-component-api</artifactId>
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
index 2e3310ee99..9ff35fd068 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.controller.queue.clustered.server;
 
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.events.EventReporter;
@@ -27,7 +26,6 @@ import 
org.apache.nifi.security.cert.StandardPeerIdentityProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.SSLSession;
 import javax.net.ssl.SSLSocket;
 import java.io.IOException;
@@ -38,51 +36,66 @@ import java.util.stream.Collectors;
 public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
     private static final Logger logger = 
LoggerFactory.getLogger(ClusterLoadBalanceAuthorizer.class);
 
+    private static final char WILDCARD = '*';
+
     private final PeerIdentityProvider peerIdentityProvider = new 
StandardPeerIdentityProvider();
 
     private final ClusterCoordinator clusterCoordinator;
     private final EventReporter eventReporter;
-    private final HostnameVerifier hostnameVerifier;
 
     public ClusterLoadBalanceAuthorizer(final ClusterCoordinator 
clusterCoordinator, final EventReporter eventReporter) {
         this.clusterCoordinator = clusterCoordinator;
         this.eventReporter = eventReporter;
-        this.hostnameVerifier = new DefaultHostnameVerifier();
     }
 
     @Override
-    public String authorize(SSLSocket sslSocket) throws IOException {
+    public String authorize(final SSLSocket sslSocket) throws IOException {
         final SSLSession sslSession = sslSocket.getSession();
 
         final Certificate[] peerCertificates = 
sslSession.getPeerCertificates();
         final Set<String> clientIdentities = 
peerIdentityProvider.getIdentities(peerCertificates);
 
-        logger.debug("Will perform authorization against Client Identities 
'{}'", clientIdentities);
-
         final Set<String> nodeIds = 
clusterCoordinator.getNodeIdentifiers().stream()
                 .map(NodeIdentifier::getApiAddress)
                 .collect(Collectors.toSet());
 
+        logger.debug("Authorizing Peer {} against Cluster Nodes {}", 
clientIdentities, nodeIds);
+
         for (final String clientId : clientIdentities) {
             if (nodeIds.contains(clientId)) {
-                logger.debug("Client ID '{}' is in the list of Nodes in the 
Cluster. Authorizing Client to Load Balance data", clientId);
+                logger.debug("Peer Certificate Identity [{}] authorized for 
Load Balancing from Cluster Node Identifiers", clientId);
                 return clientId;
             }
         }
 
-        // If there are no matches of Client IDs, try to verify it by 
HostnameVerifier. In this way, we can support wildcard certificates.
         for (final String nodeId : nodeIds) {
-            if (hostnameVerifier.verify(nodeId, sslSession)) {
+            if (isNodeIdMatched(nodeId, clientIdentities)) {
                 final String clientId = 
sslSocket.getInetAddress().getHostName();
-                logger.debug("The request was verified with node '{}'. The 
hostname derived from the socket is '{}'. Authorizing Client to Load Balance 
data", nodeId, clientId);
+                logger.debug("Peer Socket Address [{}] for Node Identifier 
[{}] authorized for Load Balancing from Certificate Wildcard", clientId, 
nodeId);
                 return clientId;
             }
         }
 
-        final String message = "Authorization failed for Client ID's to Load 
Balance data because none of the ID's are known Cluster Node Identifiers";
-
+        final String message = "Peer Certificate Identities %s not authorized 
for Load Balancing".formatted(clientIdentities);
         logger.warn(message);
         eventReporter.reportEvent(Severity.WARNING, "Load Balanced 
Connections", message);
-        throw new NotAuthorizedException("Client ID's " + clientIdentities + " 
are not authorized to Load Balance data");
+        throw new NotAuthorizedException(message);
+    }
+
+    private boolean isNodeIdMatched(final String nodeId, final Set<String> 
clientIdentities) {
+        boolean matched = false;
+
+        for (final String clientIdentity : clientIdentities) {
+            final int wildcardIndex = clientIdentity.indexOf(WILDCARD);
+            if (wildcardIndex == 0) {
+                final String clientIdentityDomain = 
clientIdentity.substring(1);
+                if (nodeId.endsWith(clientIdentityDomain)) {
+                    matched = true;
+                    break;
+                }
+            }
+        }
+
+        return matched;
     }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/ClusteredLoadBalanceAuthorizerTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/ClusteredLoadBalanceAuthorizerTest.java
new file mode 100644
index 0000000000..4974f65a81
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/ClusteredLoadBalanceAuthorizerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue.clustered.server;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.security.cert.GeneralNameType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ClusteredLoadBalanceAuthorizerTest {
+    private static final String LOCALHOST_ADDRESS = "localhost.local";
+
+    private static final String WILDCARD_LOCAL_DOMAIN = "*.local";
+
+    private static final String DNS_ADDRESS = "nifi.apache.org";
+
+    @Mock
+    private ClusterCoordinator clusterCoordinator;
+
+    @Mock
+    private SSLSocket sslSocket;
+
+    @Mock
+    private SSLSession sslSession;
+
+    @Mock
+    private X509Certificate peerCertificate;
+
+    @Mock
+    private NodeIdentifier nodeIdentifier;
+
+    private ClusterLoadBalanceAuthorizer authorizer;
+
+    @BeforeEach
+    void setAuthorizer() {
+        authorizer = new ClusterLoadBalanceAuthorizer(clusterCoordinator, 
EventReporter.NO_OP);
+    }
+
+    @Test
+    void testAuthorizeNotAuthorized() throws IOException {
+        when(sslSocket.getSession()).thenReturn(sslSession);
+        when(sslSession.getPeerCertificates()).thenReturn(new 
Certificate[]{peerCertificate});
+
+        assertThrows(NotAuthorizedException.class, () -> 
authorizer.authorize(sslSocket));
+    }
+
+    @Test
+    void testAuthorizeNodeIdentifierAuthorized() throws IOException, 
CertificateParsingException {
+        when(sslSocket.getSession()).thenReturn(sslSession);
+        when(sslSession.getPeerCertificates()).thenReturn(new 
Certificate[]{peerCertificate});
+
+        setPeerCertificate(LOCALHOST_ADDRESS);
+
+        when(nodeIdentifier.getApiAddress()).thenReturn(LOCALHOST_ADDRESS);
+        
when(clusterCoordinator.getNodeIdentifiers()).thenReturn(Set.of(nodeIdentifier));
+
+        final String authorized = authorizer.authorize(sslSocket);
+        assertEquals(LOCALHOST_ADDRESS, authorized);
+    }
+
+    @Test
+    void testAuthorizeWildcardAuthorized() throws IOException, 
CertificateParsingException {
+        when(sslSocket.getSession()).thenReturn(sslSession);
+        when(sslSession.getPeerCertificates()).thenReturn(new 
Certificate[]{peerCertificate});
+        final InetAddress socketAddress = InetAddress.getLocalHost();
+        final String socketHostName = socketAddress.getHostName();
+        when(sslSocket.getInetAddress()).thenReturn(socketAddress);
+
+        setPeerCertificate(WILDCARD_LOCAL_DOMAIN);
+
+        when(nodeIdentifier.getApiAddress()).thenReturn(LOCALHOST_ADDRESS);
+        
when(clusterCoordinator.getNodeIdentifiers()).thenReturn(Set.of(nodeIdentifier));
+
+        final String authorized = authorizer.authorize(sslSocket);
+        assertEquals(socketHostName, authorized);
+    }
+
+    @Test
+    void testAuthorizeWildcardNotAuthorized() throws IOException, 
CertificateParsingException {
+        when(sslSocket.getSession()).thenReturn(sslSession);
+        when(sslSession.getPeerCertificates()).thenReturn(new 
Certificate[]{peerCertificate});
+
+        setPeerCertificate(WILDCARD_LOCAL_DOMAIN);
+
+        when(nodeIdentifier.getApiAddress()).thenReturn(DNS_ADDRESS);
+        
when(clusterCoordinator.getNodeIdentifiers()).thenReturn(Set.of(nodeIdentifier));
+
+        assertThrows(NotAuthorizedException.class, () -> 
authorizer.authorize(sslSocket));
+    }
+
+    private void setPeerCertificate(final String name) throws 
CertificateParsingException {
+        final List<?> subjectAlternativeName = 
List.of(GeneralNameType.DNS_NAME.getNameType(), name);
+        final Collection<List<?>> names = List.of(subjectAlternativeName);
+        when(peerCertificate.getSubjectAlternativeNames()).thenReturn(names);
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index b19b1bbee6..88401b0fa6 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -43,7 +43,7 @@ import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.exception.UnreachableClusterException;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
-import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.remote.util.ClusterUrlParser;
 import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -162,7 +162,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         final SiteToSiteEventReporter eventReporter = (severity, category, 
message) -> remoteGroup.getEventReporter().reportEvent(severity, category, 
message);
 
         final SiteToSiteClient.Builder clientBuilder = new 
SiteToSiteClient.Builder()
-                
.urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris()))
+                
.urls(ClusterUrlParser.parseClusterUrls(remoteGroup.getTargetUris()))
                 .portIdentifier(getTargetIdentifier())
                 .sslContext(sslContext)
                 .useCompression(isUseCompression())
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/pom.xml 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/pom.xml
index 1eac909991..85d54d28a6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/pom.xml
@@ -120,10 +120,6 @@
             <groupId>org.springframework.security</groupId>
             <artifactId>spring-security-web</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-text</artifactId>
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 51bfc22068..bb21091b16 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -74,7 +74,7 @@ import org.apache.nifi.registry.flow.FlowSnapshotContainer;
 import org.apache.nifi.registry.flow.RegisteredFlow;
 import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionedFlowState;
-import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.remote.util.ClusterUrlParser;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.Revision;
@@ -2054,7 +2054,7 @@ public class ProcessGroupResource extends 
FlowUpdateResource<ProcessGroupImportE
 
                     // parse the uri to check if the uri is valid
                     final String targetUris = 
remoteProcessGroupDTO.getTargetUris();
-                    SiteToSiteRestApiClient.parseClusterUrls(targetUris);
+                    ClusterUrlParser.parseClusterUrls(targetUris);
 
                     // since the uri is valid, use it
                     remoteProcessGroupDTO.setTargetUris(targetUris);

Reply via email to