Repository: nifi
Updated Branches:
  refs/heads/master 02071103d -> a91984446


NIFI-2567: Site-to-Site to send large data via HTTPS

- It couldn't send data larger than about 7KB due to the mis-use of
  httpasyncclient library
- Updated httpasyncclient from 4.1.1 to 4.1.2
- Let httpasyncclient framework to call produceContent multiple times as
  it gets ready to send more data via SSL session
- Added HTTPS test cases to TestHttpClient, which failed without this
  fix


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a9198444
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a9198444
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a9198444

Branch: refs/heads/master
Commit: a919844461d63a26fa6c1d8c7daa447cd5ef912e
Parents: 0207110
Author: Koji Kawamura <[email protected]>
Authored: Sun Aug 14 22:01:52 2016 +0900
Committer: Mark Payne <[email protected]>
Committed: Fri Aug 19 14:24:53 2016 -0400

----------------------------------------------------------------------
 nifi-commons/nifi-site-to-site-client/pom.xml   |   2 +-
 .../remote/util/SiteToSiteRestApiClient.java    |  65 ++++--
 .../nifi/remote/client/http/TestHttpClient.java | 218 +++++++++++++++----
 .../src/test/resources/certs/localhost-ks.jks   | Bin 0 -> 3512 bytes
 .../src/test/resources/certs/localhost-ts.jks   | Bin 0 -> 1816 bytes
 5 files changed, 219 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml 
b/nifi-commons/nifi-site-to-site-client/pom.xml
index 63e5c12..c1857b5 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -55,7 +55,7 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpasyncclient</artifactId>
-            <version>4.1.1</version>
+            <version>4.1.2</version>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 8a379b7..d228378 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -112,6 +112,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
@@ -463,6 +464,8 @@ public class SiteToSiteRestApiClient implements Closeable {
         final HttpAsyncRequestProducer asyncRequestProducer = new 
HttpAsyncRequestProducer() {
 
             private final ByteBuffer buffer = 
ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
+            private int totalRead = 0;
+            private int totalProduced = 0;
 
             @Override
             public HttpHost getTarget() {
@@ -485,45 +488,61 @@ public class SiteToSiteRestApiClient implements Closeable 
{
                 return post;
             }
 
+            private final AtomicBoolean bufferHasRemainingData = new 
AtomicBoolean(false);
+
             @Override
             public void produceContent(final ContentEncoder encoder, final 
IOControl ioControl) throws IOException {
 
-                int totalRead = 0;
-                int totalProduced = 0;
+                if (bufferHasRemainingData.get()) {
+                    // If there's remaining buffer last time, send it first.
+                    writeBuffer(encoder);
+                    if (bufferHasRemainingData.get()) {
+                        return;
+                    }
+                }
+
                 int read;
                 // This read() blocks until data becomes available,
                 // or corresponding outputStream is closed.
-                while ((read = dataPacketChannel.read(buffer)) > -1) {
+                if ((read = dataPacketChannel.read(buffer)) > -1) {
 
-                    buffer.flip();
-                    while (buffer.hasRemaining()) {
-                        totalProduced += encoder.write(buffer);
-                    }
-                    buffer.clear();
                     logger.trace("Read {} bytes from dataPacketChannel. {}", 
read, flowFilesPath);
                     totalRead += read;
 
-                }
+                    buffer.flip();
+                    writeBuffer(encoder);
 
-                // There might be remaining bytes in buffer. Make sure it's 
fully drained.
-                buffer.flip();
-                while (buffer.hasRemaining()) {
-                    totalProduced += encoder.write(buffer);
-                }
+                } else {
 
-                final long totalWritten = 
commSession.getOutput().getBytesWritten();
-                logger.debug("sending data to {} has reached to its end. 
produced {} bytes by reading {} bytes from channel. {} bytes written in this 
transaction.",
-                    flowFilesPath, totalProduced, totalRead, totalWritten);
-                if (totalRead != totalWritten || totalProduced != 
totalWritten) {
-                    final String msg = "Sending data to %s has reached to its 
end, but produced : read : wrote byte sizes (%d : $d : %d) were not equal. 
Something went wrong.";
-                    throw new RuntimeException(String.format(msg, 
flowFilesPath, totalProduced, totalRead, totalWritten));
+                    final long totalWritten = 
commSession.getOutput().getBytesWritten();
+                    logger.debug("sending data to {} has reached to its end. 
produced {} bytes by reading {} bytes from channel. {} bytes written in this 
transaction.",
+                            flowFilesPath, totalProduced, totalRead, 
totalWritten);
+                    if (totalRead != totalWritten || totalProduced != 
totalWritten) {
+                        final String msg = "Sending data to %s has reached to 
its end, but produced : read : wrote byte sizes (%d : %d : %d) were not equal. 
Something went wrong.";
+                        throw new RuntimeException(String.format(msg, 
flowFilesPath, totalProduced, totalRead, totalWritten));
+                    }
+                    transferDataLatch.countDown();
+                    encoder.complete();
+                    dataPacketChannel.close();
                 }
-                transferDataLatch.countDown();
-                encoder.complete();
-                dataPacketChannel.close();
 
             }
 
+            private void writeBuffer(ContentEncoder encoder) throws 
IOException {
+                while (buffer.hasRemaining()) {
+                    final int written = encoder.write(buffer);
+                    logger.trace("written {} bytes to encoder.", written);
+                    if (written == 0) {
+                        logger.trace("Buffer still has remaining. {}", buffer);
+                        bufferHasRemainingData.set(true);
+                        return;
+                    }
+                    totalProduced += written;
+                }
+                bufferHasRemainingData.set(false);
+                buffer.clear();
+            }
+
             @Override
             public void requestCompleted(final HttpContext context) {
                 logger.debug("Sending data to {} completed.", flowFilesPath);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
index 3f6dd89..3a1b1bd 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
@@ -20,6 +20,7 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.KeystoreType;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.io.CompressionInputStream;
@@ -39,9 +40,16 @@ import org.apache.nifi.web.api.entity.ControllerEntity;
 import org.apache.nifi.web.api.entity.PeersEntity;
 import org.apache.nifi.web.api.entity.TransactionResultEntity;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
 import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -83,11 +91,14 @@ public class TestHttpClient {
     private static Logger logger = 
LoggerFactory.getLogger(TestHttpClient.class);
 
     private static Server server;
+    private static ServerConnector httpConnector;
+    private static ServerConnector sslConnector;
     final private static AtomicBoolean isTestCaseFinished = new 
AtomicBoolean(false);
 
     private static Set<PortDTO> inputPorts;
     private static Set<PortDTO> outputPorts;
     private static Set<PeerDTO> peers;
+    private static Set<PeerDTO> peersSecure;
     private static String serverChecksum;
 
     public static class SiteInfoServlet extends HttpServlet {
@@ -96,11 +107,18 @@ public class TestHttpClient {
         protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
 
             final ControllerDTO controller = new ControllerDTO();
-            
controller.setRemoteSiteHttpListeningPort(server.getURI().getPort());
+
+            if (req.getLocalPort() == httpConnector.getLocalPort()) {
+                
controller.setRemoteSiteHttpListeningPort(httpConnector.getLocalPort());
+                controller.setSiteToSiteSecure(false);
+            } else {
+                
controller.setRemoteSiteHttpListeningPort(sslConnector.getLocalPort());
+                controller.setSiteToSiteSecure(true);
+            }
+
             controller.setId("remote-controller-id");
             controller.setInstanceId("remote-instance-id");
             controller.setName("Remote NiFi Flow");
-            controller.setSiteToSiteSecure(false);
 
             assertNotNull("Test case should set <inputPorts> depending on the 
test scenario.", inputPorts);
             controller.setInputPorts(inputPorts);
@@ -124,8 +142,13 @@ public class TestHttpClient {
 
             final PeersEntity peersEntity = new PeersEntity();
 
-            assertNotNull("Test case should set <peers> depending on the test 
scenario.", peers);
-            peersEntity.setPeers(peers);
+            if (req.getLocalPort() == httpConnector.getLocalPort()) {
+                assertNotNull("Test case should set <peers> depending on the 
test scenario.", peers);
+                peersEntity.setPeers(peers);
+            } else {
+                assertNotNull("Test case should set <peersSecure> depending on 
the test scenario.", peersSecure);
+                peersEntity.setPeers(peersSecure);
+            }
 
             respondWithJson(resp, peersEntity);
         }
@@ -383,6 +406,21 @@ public class TestHttpClient {
         ServletHandler servletHandler = new ServletHandler();
         contextHandler.insertHandler(servletHandler);
 
+        SslContextFactory sslContextFactory = new SslContextFactory();
+        
sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks");
+        sslContextFactory.setKeyStorePassword("localtest");
+        sslContextFactory.setKeyStoreType("JKS");
+
+        httpConnector = new ServerConnector(server);
+
+        HttpConfiguration https = new HttpConfiguration();
+        https.addCustomizer(new SecureRequestCustomizer());
+        sslConnector = new ServerConnector(server,
+                new SslConnectionFactory(sslContextFactory, "http/1.1"),
+                new HttpConnectionFactory(https));
+
+        server.setConnectors(new Connector[] { httpConnector, sslConnector });
+
         servletHandler.addServletWithMapping(SiteInfoServlet.class, 
"/site-to-site");
         servletHandler.addServletWithMapping(PeersServlet.class, 
"/site-to-site/peers");
 
@@ -412,8 +450,7 @@ public class TestHttpClient {
 
         server.start();
 
-        int serverPort = server.getURI().getPort();
-        logger.info("Starting server on port {}", serverPort);
+        logger.info("Starting server on port {} for HTTP, and {} for HTTPS", 
httpConnector.getLocalPort(), sslConnector.getLocalPort());
     }
 
     @AfterClass
@@ -450,17 +487,26 @@ public class TestHttpClient {
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction",
 "DEBUG");
 
         final URI uri = server.getURI();
+        isTestCaseFinished.set(false);
+
         final PeerDTO peer = new PeerDTO();
-        peer.setHostname(uri.getHost());
-        peer.setPort(uri.getPort());
+        peer.setHostname("localhost");
+        peer.setPort(httpConnector.getLocalPort());
         peer.setFlowFileCount(10);
         peer.setSecure(false);
 
-        isTestCaseFinished.set(false);
-
         peers = new HashSet<>();
         peers.add(peer);
 
+        final PeerDTO peerSecure = new PeerDTO();
+        peerSecure.setHostname("localhost");
+        peerSecure.setPort(sslConnector.getLocalPort());
+        peerSecure.setFlowFileCount(10);
+        peerSecure.setSecure(true);
+
+        peersSecure = new HashSet<>();
+        peersSecure.add(peerSecure);
+
         inputPorts = new HashSet<>();
 
         final PortDTO runningInputPort = new PortDTO();
@@ -522,9 +568,20 @@ public class TestHttpClient {
     }
 
     private SiteToSiteClient.Builder getDefaultBuilder() {
-        final URI uri = server.getURI();
         return new 
SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP)
-                .url("http://"; + uri.getHost() + ":" + uri.getPort() + "/nifi")
+                .url("http://localhost:"; + httpConnector.getLocalPort() + 
"/nifi")
+                ;
+    }
+
+    private SiteToSiteClient.Builder getDefaultBuilderHTTPS() {
+        return new 
SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP)
+                .url("https://localhost:"; + sslConnector.getLocalPort() + 
"/nifi")
+                .keystoreFilename("src/test/resources/certs/localhost-ks.jks")
+                .keystorePass("localtest")
+                .keystoreType(KeystoreType.JKS)
+                
.truststoreFilename("src/test/resources/certs/localhost-ts.jks")
+                .truststorePass("localtest")
+                .truststoreType(KeystoreType.JKS)
                 ;
     }
 
@@ -594,9 +651,6 @@ public class TestHttpClient {
     @Test
     public void testSendSuccess() throws Exception {
 
-        final URI uri = server.getURI();
-
-        logger.info("uri={}", uri);
         try (
             SiteToSiteClient client = getDefaultBuilder()
                 .portName("input-running")
@@ -628,11 +682,94 @@ public class TestHttpClient {
     }
 
     @Test
-    public void testSendSuccessCompressed() throws Exception {
+    public void testSendSuccessHTTPS() throws Exception {
 
-        final URI uri = server.getURI();
+        try (
+                SiteToSiteClient client = getDefaultBuilderHTTPS()
+                        .portName("input-running")
+                        .build()
+        ) {
+            final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+
+            assertNotNull(transaction);
+
+            serverChecksum = "1071206772";
+
+
+            for (int i = 0; i < 20; i++) {
+                DataPacket packet = new DataPacketBuilder()
+                        .contents("Example contents from client.")
+                        .attr("Client attr 1", "Client attr 1 value")
+                        .attr("Client attr 2", "Client attr 2 value")
+                        .build();
+                transaction.send(packet);
+                long written = 
((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
+                logger.info("{}: {} bytes have been written.", i, written);
+            }
+
+            transaction.confirm();
+
+            transaction.complete();
+        }
+
+    }
+
+    private static void testSendLargeFile(SiteToSiteClient client) throws 
IOException {
+        final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+
+        assertNotNull(transaction);
+
+        serverChecksum = "1527414060";
+
+        final int contentSize = 10_000;
+        final StringBuilder sb = new StringBuilder(contentSize);
+        for (int i = 0; i < contentSize; i++) {
+            sb.append("a");
+        }
+
+        DataPacket packet = new DataPacketBuilder()
+                .contents(sb.toString())
+                .attr("Client attr 1", "Client attr 1 value")
+                .attr("Client attr 2", "Client attr 2 value")
+                .build();
+        transaction.send(packet);
+        long written = 
((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
+        logger.info("{} bytes have been written.", written);
+
+        transaction.confirm();
+
+        transaction.complete();
+    }
+
+    @Test
+    public void testSendLargeFileHTTP() throws Exception {
+
+        try (
+                SiteToSiteClient client = getDefaultBuilder()
+                        .portName("input-running")
+                        .build()
+        ) {
+            testSendLargeFile(client);
+        }
+
+    }
+
+    @Test
+    public void testSendLargeFileHTTPS() throws Exception {
+
+        try (
+                SiteToSiteClient client = getDefaultBuilderHTTPS()
+                        .portName("input-running")
+                        .build()
+        ) {
+            testSendLargeFile(client);
+        }
+
+    }
+
+    @Test
+    public void testSendSuccessCompressed() throws Exception {
 
-        logger.info("uri={}", uri);
         try (
                 SiteToSiteClient client = getDefaultBuilder()
                         .portName("input-running")
@@ -667,9 +804,6 @@ public class TestHttpClient {
     @Test
     public void testSendSlowClientSuccess() throws Exception {
 
-        final URI uri = server.getURI();
-
-        logger.info("uri={}", uri);
         try (
                 SiteToSiteClient client = getDefaultBuilder()
                         .idleExpiration(1000, TimeUnit.MILLISECONDS)
@@ -722,9 +856,6 @@ public class TestHttpClient {
     @Test
     public void testSendTimeout() throws Exception {
 
-        final URI uri = server.getURI();
-
-        logger.info("uri={}", uri);
         try (
             SiteToSiteClient client = getDefaultBuilder()
                 .timeout(1, TimeUnit.SECONDS)
@@ -761,9 +892,6 @@ public class TestHttpClient {
 
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction",
 "INFO");
 
-        final URI uri = server.getURI();
-
-        logger.info("uri={}", uri);
         try (
                 SiteToSiteClient client = getDefaultBuilder()
                         .idleExpiration(500, TimeUnit.MILLISECONDS)
@@ -822,9 +950,6 @@ public class TestHttpClient {
     @Test
     public void testReceiveSuccess() throws Exception {
 
-        final URI uri = server.getURI();
-
-        logger.info("uri={}", uri);
         try (
             SiteToSiteClient client = getDefaultBuilder()
                 .portName("output-running")
@@ -844,11 +969,29 @@ public class TestHttpClient {
     }
 
     @Test
-    public void testReceiveSuccessCompressed() throws Exception {
+    public void testReceiveSuccessHTTPS() throws Exception {
 
-        final URI uri = server.getURI();
+        try (
+                SiteToSiteClient client = getDefaultBuilderHTTPS()
+                        .portName("output-running")
+                        .build()
+        ) {
+            final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+
+            assertNotNull(transaction);
+
+            DataPacket packet;
+            while ((packet = transaction.receive()) != null) {
+                consumeDataPacket(packet);
+            }
+            transaction.confirm();
+            transaction.complete();
+        }
+    }
+
+    @Test
+    public void testReceiveSuccessCompressed() throws Exception {
 
-        logger.info("uri={}", uri);
         try (
                 SiteToSiteClient client = getDefaultBuilder()
                         .portName("output-running")
@@ -871,9 +1014,6 @@ public class TestHttpClient {
     @Test
     public void testReceiveSlowClientSuccess() throws Exception {
 
-        final URI uri = server.getURI();
-
-        logger.info("uri={}", uri);
         try (
                 SiteToSiteClient client = getDefaultBuilder()
                         .portName("output-running")
@@ -896,9 +1036,6 @@ public class TestHttpClient {
     @Test
     public void testReceiveTimeout() throws Exception {
 
-        final URI uri = server.getURI();
-
-        logger.info("uri={}", uri);
         try (
                 SiteToSiteClient client = getDefaultBuilder()
                         .timeout(1, TimeUnit.SECONDS)
@@ -918,9 +1055,6 @@ public class TestHttpClient {
     @Test
     public void testReceiveTimeoutAfterDataExchange() throws Exception {
 
-        final URI uri = server.getURI();
-
-        logger.info("uri={}", uri);
         try (
                 SiteToSiteClient client = getDefaultBuilder()
                         .timeout(1, TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks
 
b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks
new file mode 100755
index 0000000..df36197
Binary files /dev/null and 
b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks
 
b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks
new file mode 100755
index 0000000..7824378
Binary files /dev/null and 
b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks
 differ

Reply via email to