Repository: nifi
Updated Branches:
  refs/heads/master 2ceb5c809 -> 671301193


NIFI-2525: Fix Proxy auth issue with async send.

Without this fix, NiFi fails to send data via HTTP Site-to-Site through
Proxy which requires authentication due to AsynchronousCloseException.
It happens when async client replays producing contents in order to re-send the
request with auth credential for the proxy server, however the
connection is already closed.
This fix makes NiFi to send actual data only at the second round of requests, 
so that flow-file
contents can be sent without reading it twice.

Signed-off-by: Yolanda M. Davis <[email protected]>

This closes #915


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/67130119
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/67130119
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/67130119

Branch: refs/heads/master
Commit: 671301193bf5b05a42d0e19aaa9503424ec04bc3
Parents: 2ceb5c8
Author: Koji Kawamura <[email protected]>
Authored: Tue Aug 23 10:57:57 2016 +0900
Committer: Yolanda M. Davis <[email protected]>
Committed: Wed Aug 24 20:39:17 2016 -0400

----------------------------------------------------------------------
 nifi-commons/nifi-site-to-site-client/pom.xml   |   6 +
 .../apache/nifi/remote/AbstractTransaction.java |   2 +-
 .../remote/util/SiteToSiteRestApiClient.java    | 291 ++++++++++++++---
 .../nifi/remote/client/http/TestHttpClient.java | 312 +++++++++++++++----
 4 files changed, 497 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/67130119/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml 
b/nifi-commons/nifi-site-to-site-client/pom.xml
index c1857b5..2806607 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -79,5 +79,11 @@
             <artifactId>jetty-servlet</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.littleshoot</groupId>
+            <artifactId>littleproxy</artifactId>
+            <version>1.1.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/67130119/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
index 3e700aa..0752fa1 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
@@ -296,7 +296,7 @@ public abstract class AbstractTransaction implements 
Transaction {
                         transactionResponse = readTransactionResponse();
                     } catch (final IOException e) {
                         throw new IOException(this + " Failed to receive a 
response from " + peer + " when expecting a TransactionFinished Indicator. "
-                                + "It is unknown whether or not the peer 
successfully received/processed the data.", e);
+                                + "It is unknown whether or not the peer 
successfully received/processed the data. " + e, e);
                     }
 
                     logger.debug("{} Received {} from {}", this, 
transactionResponse, peer);

http://git-wip-us.apache.org/repos/asf/nifi/blob/67130119/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index d228378..862f2cd 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -27,6 +27,7 @@ import org.apache.http.HttpResponse;
 import org.apache.http.HttpResponseInterceptor;
 import org.apache.http.StatusLine;
 import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.AuthState;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.RequestConfig;
@@ -138,6 +139,8 @@ public class SiteToSiteRestApiClient implements Closeable {
     private String baseUrl;
     protected final SSLContext sslContext;
     protected final HttpProxy proxy;
+    private final AtomicBoolean proxyAuthRequiresResend = new 
AtomicBoolean(false);
+
     private RequestConfig requestConfig;
     private CredentialsProvider credentialsProvider;
     private CloseableHttpClient httpClient;
@@ -300,8 +303,7 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     public ControllerDTO getController() throws IOException {
         try {
-            final HttpGet get = createGet("/site-to-site");
-            get.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+            final HttpGet get = createGetControllerRequest();
             return execute(get, ControllerEntity.class).getController();
 
         } catch (final HttpGetFailedException e) {
@@ -314,6 +316,12 @@ public class SiteToSiteRestApiClient implements Closeable {
         }
     }
 
+    private HttpGet createGetControllerRequest() {
+        final HttpGet get = createGet("/site-to-site");
+        get.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
+        return get;
+    }
+
     public Collection<PeerDTO> getPeers() throws IOException {
         final HttpGet get = createGet("/site-to-site/peers");
         get.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
@@ -321,15 +329,10 @@ public class SiteToSiteRestApiClient implements Closeable 
{
     }
 
     public String initiateTransaction(final TransferDirection direction, final 
String portId) throws IOException {
-        if (TransferDirection.RECEIVE.equals(direction)) {
-            return initiateTransaction("output-ports", portId);
-        } else {
-            return initiateTransaction("input-ports", portId);
-        }
-    }
 
-    private String initiateTransaction(final String portType, final String 
portId) throws IOException {
+        final String portType = TransferDirection.RECEIVE.equals(direction) ? 
"output-ports" : "input-ports";
         logger.debug("initiateTransaction handshaking portType={}, portId={}", 
portType, portId);
+
         final HttpPost post = createPost("/data-transfer/" + portType + "/" + 
portId + "/transactions");
 
         post.setHeader("Accept", "application/json");
@@ -337,43 +340,223 @@ public class SiteToSiteRestApiClient implements 
Closeable {
 
         setHandshakeProperties(post);
 
-        try (CloseableHttpResponse response = getHttpClient().execute(post)) {
-            final int responseCode = response.getStatusLine().getStatusCode();
-            logger.debug("initiateTransaction responseCode={}", responseCode);
+        final HttpResponse response;
+        if (TransferDirection.RECEIVE.equals(direction)) {
+            response = initiateTransactionForReceive(post);
+        } else {
+            response = initiateTransactionForSend(post);
+        }
 
-            String transactionUrl;
-            switch (responseCode) {
-                case RESPONSE_CODE_CREATED:
-                    EntityUtils.consume(response.getEntity());
+        final int responseCode = response.getStatusLine().getStatusCode();
+        logger.debug("initiateTransaction responseCode={}", responseCode);
 
-                    transactionUrl = readTransactionUrl(response);
-                    if (isEmpty(transactionUrl)) {
-                        throw new ProtocolException("Server returned 
RESPONSE_CODE_CREATED without Location header");
-                    }
-                    final Header transportProtocolVersionHeader = 
response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
-                    if (transportProtocolVersionHeader == null) {
-                        throw new ProtocolException("Server didn't return 
confirmed protocol version");
-                    }
-                    final Integer protocolVersionConfirmedByServer = 
Integer.valueOf(transportProtocolVersionHeader.getValue());
-                    logger.debug("Finished version negotiation, 
protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
-                    
transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
+        String transactionUrl;
+        switch (responseCode) {
+            case RESPONSE_CODE_CREATED:
+                EntityUtils.consume(response.getEntity());
 
-                    final Header serverTransactionTtlHeader = 
response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
-                    if (serverTransactionTtlHeader == null) {
-                        throw new ProtocolException("Server didn't return " + 
HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
-                    }
-                    serverTransactionTtl = 
Integer.parseInt(serverTransactionTtlHeader.getValue());
-                    break;
+                transactionUrl = readTransactionUrl(response);
+                if (isEmpty(transactionUrl)) {
+                    throw new ProtocolException("Server returned 
RESPONSE_CODE_CREATED without Location header");
+                }
+                final Header transportProtocolVersionHeader = 
response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
+                if (transportProtocolVersionHeader == null) {
+                    throw new ProtocolException("Server didn't return 
confirmed protocol version");
+                }
+                final Integer protocolVersionConfirmedByServer = 
Integer.valueOf(transportProtocolVersionHeader.getValue());
+                logger.debug("Finished version negotiation, 
protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
+                
transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
 
-                default:
-                    try (InputStream content = 
response.getEntity().getContent()) {
-                        throw handleErrResponse(responseCode, content);
-                    }
+                final Header serverTransactionTtlHeader = 
response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
+                if (serverTransactionTtlHeader == null) {
+                    throw new ProtocolException("Server didn't return " + 
HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
+                }
+                serverTransactionTtl = 
Integer.parseInt(serverTransactionTtlHeader.getValue());
+                break;
+
+            default:
+                try (InputStream content = response.getEntity().getContent()) {
+                    throw handleErrResponse(responseCode, content);
+                }
+        }
+        logger.debug("initiateTransaction handshaking finished, 
transactionUrl={}", transactionUrl);
+        return transactionUrl;
+
+    }
+
+    /**
+     * Initiate a transaction for receiving data.
+     * @param post a POST request to establish transaction
+     * @return POST request response
+     * @throws IOException thrown if the post request failed
+     */
+    private HttpResponse initiateTransactionForReceive(final HttpPost post) 
throws IOException {
+        return getHttpClient().execute(post);
+    }
+
+    /**
+     * <p>
+     * Initiate a transaction for sending data.
+     * </p>
+     *
+     * <p>
+     * If a proxy server requires auth, the proxy server returns 407 response 
with available auth schema such as basic or digest.
+     * Then client has to resend the same request with its credential added.
+     * This mechanism is problematic for sending data from NiFi.
+     * </p>
+     *
+     * <p>
+     * In order to resend a POST request with auth param,
+     * NiFi has to either read flow-file contents to send again, or keep the 
POST body somewhere.
+     * If we store that in memory, it would causes OOM, or storing it on disk 
slows down performance.
+     * Rolling back processing session would be overkill.
+     * Reading flow-file contents only when it's ready to send in a streaming 
way is ideal.
+     * </p>
+     *
+     * <p>
+     * Additionally, the way proxy authentication is done is vary among Proxy 
server software.
+     * Some requires 407 and resend cycle for every requests, while others 
keep a connection between a client and
+     * the proxy server, then consecutive requests skip auth steps.
+     * The problem is, that how should we behave is only told after sending a 
request to the proxy.
+     * </p>
+     *
+     * In order to handle above concerns correctly and efficiently, this 
method do the followings:
+     *
+     * <ol>
+     * <li>Send a GET request to controller resource, to initiate an 
HttpAsyncClient. The instance will be used for further requests.
+     *      This is not required by the Site-to-Site protocol, but it can 
setup proxy auth state safely.</li>
+     * <li>Send a POST request to initiate a transaction. While doing so, it 
captures how a proxy server works.
+     * If 407 and resend cycle occurs here, it implies that we need to do the 
same thing again when we actually send the data.
+     * Because if the proxy keeps using the same connection and doesn't 
require an auth step, it doesn't do so here.</li>
+     * <li>Then this method stores whether the final POST request should wait 
for the auth step.
+     * So that {@link #openConnectionForSend} can determine when to produce 
contents.</li>
+     * </ol>
+     *
+     * <p>
+     * The above special sequence is only executed when a proxy instance is 
set, and its username is set.
+     * </p>
+     *
+     * @param post a POST request to establish transaction
+     * @return POST request response
+     * @throws IOException thrown if the post request failed
+     */
+    private HttpResponse initiateTransactionForSend(final HttpPost post) 
throws IOException {
+        if (shouldCheckProxyAuth()) {
+            final CloseableHttpAsyncClient asyncClient = getHttpAsyncClient();
+            final HttpGet get = createGetControllerRequest();
+            final Future<HttpResponse> getResult = asyncClient.execute(get, 
null);
+            try {
+                final HttpResponse getResponse = 
getResult.get(readTimeoutMillis, TimeUnit.MILLISECONDS);
+                logger.debug("Proxy auth check has done. getResponse={}", 
getResponse.getStatusLine());
+            } catch (final ExecutionException e) {
+                logger.debug("Something has happened at get controller 
requesting thread for proxy auth check. {}", e.getMessage());
+                throw toIOException(e);
+            } catch (TimeoutException | InterruptedException e) {
+                throw new IOException(e);
             }
-            logger.debug("initiateTransaction handshaking finished, 
transactionUrl={}", transactionUrl);
-            return transactionUrl;
         }
 
+        final HttpAsyncRequestProducer asyncRequestProducer = new 
HttpAsyncRequestProducer() {
+            private boolean requestHasBeenReset = false;
+
+            @Override
+            public HttpHost getTarget() {
+                return URIUtils.extractHost(post.getURI());
+            }
+
+            @Override
+            public HttpRequest generateRequest() throws IOException, 
HttpException {
+                final BasicHttpEntity entity = new BasicHttpEntity();
+                post.setEntity(entity);
+                return post;
+            }
+
+            @Override
+            public void produceContent(ContentEncoder encoder, IOControl 
ioctrl) throws IOException {
+                encoder.complete();
+                if (shouldCheckProxyAuth() && requestHasBeenReset) {
+                    logger.debug("Produced content again, assuming the proxy 
server requires authentication.");
+                    proxyAuthRequiresResend.set(true);
+                }
+            }
+
+            @Override
+            public void requestCompleted(HttpContext context) {
+                debugProxyAuthState(context);
+            }
+
+            @Override
+            public void failed(Exception ex) {
+                logger.error("Create transaction for {} has failed", 
post.getURI(), ex);
+            }
+
+            @Override
+            public boolean isRepeatable() {
+                return true;
+            }
+
+            @Override
+            public void resetRequest() throws IOException {
+                requestHasBeenReset = true;
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+        };
+
+        final Future<HttpResponse> responseFuture = 
getHttpAsyncClient().execute(asyncRequestProducer, new 
BasicAsyncResponseConsumer(), null);
+        final HttpResponse response;
+        try {
+            response = responseFuture.get(readTimeoutMillis, 
TimeUnit.MILLISECONDS);
+
+        } catch (final ExecutionException e) {
+            logger.debug("Something has happened at initiate transaction 
requesting thread. {}", e.getMessage());
+            throw toIOException(e);
+        } catch (TimeoutException | InterruptedException e) {
+            throw new IOException(e);
+        }
+        return response;
+    }
+
+    /**
+     * Print AuthState in HttpContext for debugging purpose.
+     * <p>
+     * If the proxy server requires 407 and resend cycle, this method logs as 
followings, for Basic Auth:
+     * <ul><li>state:UNCHALLENGED;</li>
+     * <li>state:CHALLENGED;auth scheme:basic;credentials present</li></ul>
+     * </p>
+     * <p>
+     * For Digest Auth:
+     * <ul><li>state:UNCHALLENGED;</li>
+     * <li>state:CHALLENGED;auth scheme:digest;credentials present</li></ul>
+     * </p>
+     * <p>
+     * But if the proxy uses the same connection, it doesn't return 407, in 
such case
+     * this method is called only once with:
+     * <ul><li>state:UNCHALLENGED</li></ul>
+     * </p>
+     */
+    private void debugProxyAuthState(HttpContext context) {
+        final AuthState proxyAuthState;
+        if (shouldCheckProxyAuth()
+                && logger.isDebugEnabled()
+                && (proxyAuthState = 
(AuthState)context.getAttribute("http.auth.proxy-scope")) != null){
+            logger.debug("authProxyScope={}", proxyAuthState);
+        }
+    }
+
+    private IOException toIOException(ExecutionException e) {
+        final Throwable cause = e.getCause();
+        if (cause instanceof IOException) {
+            return (IOException) cause;
+        } else {
+            return new IOException(cause);
+        }
+    }
+
+    private boolean shouldCheckProxyAuth() {
+        return proxy != null && !isEmpty(proxy.getUsername());
     }
 
     public boolean openConnectionForReceive(final String transactionUrl, final 
Peer peer) throws IOException {
@@ -464,9 +647,13 @@ public class SiteToSiteRestApiClient implements Closeable {
         final HttpAsyncRequestProducer asyncRequestProducer = new 
HttpAsyncRequestProducer() {
 
             private final ByteBuffer buffer = 
ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
+
             private int totalRead = 0;
             private int totalProduced = 0;
 
+            private boolean requestHasBeenReset = false;
+
+
             @Override
             public HttpHost getTarget() {
                 return URIUtils.extractHost(requestUri);
@@ -490,9 +677,22 @@ public class SiteToSiteRestApiClient implements Closeable {
 
             private final AtomicBoolean bufferHasRemainingData = new 
AtomicBoolean(false);
 
+            /**
+             * If the proxy server requires authentication, the same POST 
request has to be sent again.
+             * The first request will result 407, then the next one will be 
sent with auth headers and actual data.
+             * This method produces a content only when it's need to be sent, 
to avoid producing the flow-file contents twice.
+             * Whether we need to wait auth is determined heuristically by the 
previous POST request which creates transaction.
+             * See {@link 
SiteToSiteRestApiClient#initiateTransactionForSend(HttpPost)} for further 
detail.
+             */
             @Override
             public void produceContent(final ContentEncoder encoder, final 
IOControl ioControl) throws IOException {
 
+                if (shouldCheckProxyAuth() && proxyAuthRequiresResend.get() && 
!requestHasBeenReset) {
+                    logger.debug("Need authentication with proxy server. 
Postpone producing content.");
+                    encoder.complete();
+                    return;
+                }
+
                 if (bufferHasRemainingData.get()) {
                     // If there's remaining buffer last time, send it first.
                     writeBuffer(encoder);
@@ -546,6 +746,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             @Override
             public void requestCompleted(final HttpContext context) {
                 logger.debug("Sending data to {} completed.", flowFilesPath);
+                debugProxyAuthState(context);
             }
 
             @Override
@@ -562,6 +763,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             @Override
             public void resetRequest() throws IOException {
                 logger.debug("Sending data request to {} has been reset...", 
flowFilesPath);
+                requestHasBeenReset = true;
             }
 
             @Override
@@ -617,13 +819,8 @@ public class SiteToSiteRestApiClient implements Closeable {
         try {
             response = postResult.get(readTimeoutMillis, 
TimeUnit.MILLISECONDS);
         } catch (final ExecutionException e) {
-            logger.debug("Something has happened at sending thread. {}", 
e.getMessage());
-            final Throwable cause = e.getCause();
-            if (cause instanceof IOException) {
-                throw (IOException) cause;
-            } else {
-                throw new IOException(cause);
-            }
+            logger.debug("Something has happened at sending data thread. {}", 
e.getMessage());
+            throw toIOException(e);
         } catch (TimeoutException | InterruptedException e) {
             throw new IOException(e);
         }
@@ -765,7 +962,7 @@ public class SiteToSiteRestApiClient implements Closeable {
         }
     }
 
-    private String readTransactionUrl(final CloseableHttpResponse response) {
+    private String readTransactionUrl(final HttpResponse response) {
         final Header locationUriIntentHeader = 
response.getFirstHeader(LOCATION_URI_INTENT_NAME);
         logger.debug("locationUriIntentHeader={}", locationUriIntentHeader);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/67130119/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
index 3a1b1bd..1dcca2c 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
@@ -29,6 +29,7 @@ import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpHeaders;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
@@ -55,6 +56,9 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.littleshoot.proxy.HttpProxyServer;
+import org.littleshoot.proxy.ProxyAuthenticator;
+import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +70,7 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.ServerSocket;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.HashMap;
@@ -95,6 +100,9 @@ public class TestHttpClient {
     private static ServerConnector sslConnector;
     final private static AtomicBoolean isTestCaseFinished = new 
AtomicBoolean(false);
 
+    private static HttpProxyServer proxyServer;
+    private static HttpProxyServer proxyServerWithAuth;
+
     private static Set<PortDTO> inputPorts;
     private static Set<PortDTO> outputPorts;
     private static Set<PeerDTO> peers;
@@ -451,12 +459,64 @@ public class TestHttpClient {
         server.start();
 
         logger.info("Starting server on port {} for HTTP, and {} for HTTPS", 
httpConnector.getLocalPort(), sslConnector.getLocalPort());
+
+        startProxyServer();
+        startProxyServerWithAuth();
+    }
+
+    private static void startProxyServer() throws IOException {
+        int proxyServerPort;
+        try (final ServerSocket serverSocket = new ServerSocket(0)) {
+            proxyServerPort = serverSocket.getLocalPort();
+        }
+        proxyServer = DefaultHttpProxyServer.bootstrap()
+                .withPort(proxyServerPort)
+                .withAllowLocalOnly(true)
+                .start();
+    }
+
+    private static final String PROXY_USER = "proxy user";
+    private static final String PROXY_PASSWORD = "proxy password";
+    private static void startProxyServerWithAuth() throws IOException {
+        int proxyServerPort;
+        try (final ServerSocket serverSocket = new ServerSocket(0)) {
+            proxyServerPort = serverSocket.getLocalPort();
+        }
+        proxyServerWithAuth = DefaultHttpProxyServer.bootstrap()
+                .withPort(proxyServerPort)
+                .withAllowLocalOnly(true)
+                .withProxyAuthenticator(new ProxyAuthenticator() {
+                    @Override
+                    public boolean authenticate(String userName, String 
password) {
+                        return PROXY_USER.equals(userName) && 
PROXY_PASSWORD.equals(password);
+                    }
+
+                    @Override
+                    public String getRealm() {
+                        return "NiFi Unit Test";
+                    }
+                })
+                .start();
     }
 
     @AfterClass
     public static void teardown() throws Exception {
-        logger.info("Stopping server.");
-        server.stop();
+        logger.info("Stopping servers.");
+        try {
+            server.stop();
+        } catch (Exception e) {
+            logger.error("Failed to stop Jetty server due to " + e, e);
+        }
+        try {
+            proxyServer.stop();
+        } catch (Exception e) {
+            logger.error("Failed to stop Proxy server due to " + e, e);
+        }
+        try {
+            proxyServerWithAuth.stop();
+        } catch (Exception e) {
+            logger.error("Failed to stop Proxy server with auth due to " + e, 
e);
+        }
     }
 
     private static class DataPacketBuilder {
@@ -486,7 +546,6 @@ public class TestHttpClient {
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"TRACE");
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction",
 "DEBUG");
 
-        final URI uri = server.getURI();
         isTestCaseFinished.set(false);
 
         final PeerDTO peer = new PeerDTO();
@@ -648,68 +707,95 @@ public class TestHttpClient {
         }
     }
 
+    private void testSend(SiteToSiteClient client) throws Exception {
+        final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+
+        assertNotNull(transaction);
+
+        serverChecksum = "1071206772";
+
+
+        for (int i = 0; i < 20; i++) {
+            DataPacket packet = new DataPacketBuilder()
+                    .contents("Example contents from client.")
+                    .attr("Client attr 1", "Client attr 1 value")
+                    .attr("Client attr 2", "Client attr 2 value")
+                    .build();
+            transaction.send(packet);
+            long written = 
((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
+            logger.info("{}: {} bytes have been written.", i, written);
+        }
+
+        transaction.confirm();
+
+        transaction.complete();
+    }
+
     @Test
     public void testSendSuccess() throws Exception {
 
         try (
-            SiteToSiteClient client = getDefaultBuilder()
-                .portName("input-running")
-                .build()
+                final SiteToSiteClient client = getDefaultBuilder()
+                    .portName("input-running")
+                    .build()
         ) {
-            final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
-
-            assertNotNull(transaction);
-
-            serverChecksum = "1071206772";
-
+            testSend(client);
+        }
 
-            for (int i = 0; i < 20; i++) {
-                DataPacket packet = new DataPacketBuilder()
-                        .contents("Example contents from client.")
-                        .attr("Client attr 1", "Client attr 1 value")
-                        .attr("Client attr 2", "Client attr 2 value")
-                        .build();
-                transaction.send(packet);
-                long written = 
((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
-                logger.info("{}: {} bytes have been written.", i, written);
-            }
+    }
 
-            transaction.confirm();
+    @Test
+    public void testSendSuccessWithProxy() throws Exception {
 
-            transaction.complete();
+        try (
+                final SiteToSiteClient client = getDefaultBuilder()
+                        .portName("input-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServer.getListenAddress().getPort(), null, null))
+                        .build()
+        ) {
+            testSend(client);
         }
 
     }
 
     @Test
-    public void testSendSuccessHTTPS() throws Exception {
+    public void testSendProxyAuthFailed() throws Exception {
 
         try (
-                SiteToSiteClient client = getDefaultBuilderHTTPS()
+                final SiteToSiteClient client = getDefaultBuilder()
                         .portName("input-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServerWithAuth.getListenAddress().getPort(), null, null))
                         .build()
         ) {
             final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+            assertNull("createTransaction should fail at peer selection and 
return null.", transaction);
+        }
 
-            assertNotNull(transaction);
+    }
 
-            serverChecksum = "1071206772";
+    @Test
+    public void testSendSuccessWithProxyAuth() throws Exception {
 
+        try (
+                final SiteToSiteClient client = getDefaultBuilder()
+                        .portName("input-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
+                        .build()
+        ) {
+            testSend(client);
+        }
 
-            for (int i = 0; i < 20; i++) {
-                DataPacket packet = new DataPacketBuilder()
-                        .contents("Example contents from client.")
-                        .attr("Client attr 1", "Client attr 1 value")
-                        .attr("Client attr 2", "Client attr 2 value")
-                        .build();
-                transaction.send(packet);
-                long written = 
((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
-                logger.info("{}: {} bytes have been written.", i, written);
-            }
+    }
 
-            transaction.confirm();
+    @Test
+    public void testSendSuccessHTTPS() throws Exception {
 
-            transaction.complete();
+        try (
+                final SiteToSiteClient client = getDefaultBuilderHTTPS()
+                        .portName("input-running")
+                        .build()
+        ) {
+            testSend(client);
         }
 
     }
@@ -755,6 +841,34 @@ public class TestHttpClient {
     }
 
     @Test
+    public void testSendLargeFileHTTPWithProxy() throws Exception {
+
+        try (
+                SiteToSiteClient client = getDefaultBuilder()
+                        .portName("input-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServer.getListenAddress().getPort(), null, null))
+                        .build()
+        ) {
+            testSendLargeFile(client);
+        }
+
+    }
+
+    @Test
+    public void testSendLargeFileHTTPWithProxyAuth() throws Exception {
+
+        try (
+                SiteToSiteClient client = getDefaultBuilder()
+                        .portName("input-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
+                        .build()
+        ) {
+            testSendLargeFile(client);
+        }
+
+    }
+
+    @Test
     public void testSendLargeFileHTTPS() throws Exception {
 
         try (
@@ -768,6 +882,34 @@ public class TestHttpClient {
     }
 
     @Test
+    public void testSendLargeFileHTTPSWithProxy() throws Exception {
+
+        try (
+                SiteToSiteClient client = getDefaultBuilderHTTPS()
+                        .portName("input-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServer.getListenAddress().getPort(), null, null))
+                        .build()
+        ) {
+            testSendLargeFile(client);
+        }
+
+    }
+
+    @Test
+    public void testSendLargeFileHTTPSWithProxyAuth() throws Exception {
+
+        try (
+                SiteToSiteClient client = getDefaultBuilderHTTPS()
+                        .portName("input-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
+                        .build()
+        ) {
+            testSendLargeFile(client);
+        }
+
+    }
+
+    @Test
     public void testSendSuccessCompressed() throws Exception {
 
         try (
@@ -947,6 +1089,19 @@ public class TestHttpClient {
         }
     }
 
+    private void testReceive(SiteToSiteClient client) throws IOException {
+        final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+
+        assertNotNull(transaction);
+
+        DataPacket packet;
+        while ((packet = transaction.receive()) != null) {
+            consumeDataPacket(packet);
+        }
+        transaction.confirm();
+        transaction.complete();
+    }
+
     @Test
     public void testReceiveSuccess() throws Exception {
 
@@ -955,16 +1110,33 @@ public class TestHttpClient {
                 .portName("output-running")
                 .build()
         ) {
-            final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+            testReceive(client);
+        }
+    }
 
-            assertNotNull(transaction);
+    @Test
+    public void testReceiveSuccessWithProxy() throws Exception {
 
-            DataPacket packet;
-            while ((packet = transaction.receive()) != null) {
-                consumeDataPacket(packet);
-            }
-            transaction.confirm();
-            transaction.complete();
+        try (
+                SiteToSiteClient client = getDefaultBuilder()
+                        .portName("output-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServer.getListenAddress().getPort(), null, null))
+                        .build()
+        ) {
+            testReceive(client);
+        }
+    }
+
+    @Test
+    public void testReceiveSuccessWithProxyAuth() throws Exception {
+
+        try (
+                SiteToSiteClient client = getDefaultBuilder()
+                        .portName("output-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
+                        .build()
+        ) {
+            testReceive(client);
         }
     }
 
@@ -976,16 +1148,33 @@ public class TestHttpClient {
                         .portName("output-running")
                         .build()
         ) {
-            final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+            testReceive(client);
+        }
+    }
 
-            assertNotNull(transaction);
+    @Test
+    public void testReceiveSuccessHTTPSWithProxy() throws Exception {
 
-            DataPacket packet;
-            while ((packet = transaction.receive()) != null) {
-                consumeDataPacket(packet);
-            }
-            transaction.confirm();
-            transaction.complete();
+        try (
+                SiteToSiteClient client = getDefaultBuilderHTTPS()
+                        .portName("output-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServer.getListenAddress().getPort(), null, null))
+                        .build()
+        ) {
+            testReceive(client);
+        }
+    }
+
+    @Test
+    public void testReceiveSuccessHTTPSWithProxyAuth() throws Exception {
+
+        try (
+                SiteToSiteClient client = getDefaultBuilderHTTPS()
+                        .portName("output-running")
+                        .httpProxy(new HttpProxy("localhost", 
proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
+                        .build()
+        ) {
+            testReceive(client);
         }
     }
 
@@ -998,16 +1187,7 @@ public class TestHttpClient {
                         .useCompression(true)
                         .build()
         ) {
-            final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
-
-            assertNotNull(transaction);
-
-            DataPacket packet;
-            while ((packet = transaction.receive()) != null) {
-                consumeDataPacket(packet);
-            }
-            transaction.confirm();
-            transaction.complete();
+            testReceive(client);
         }
     }
 

Reply via email to